Skip to content

AioPikaFastProducer

faststream.rabbit.producer.AioPikaFastProducer #

AioPikaFastProducer(
    channel: RobustChannel,
    declarer: RabbitDeclarer,
    parser: Optional[
        AsyncCustomParser[IncomingMessage, RabbitMessage]
    ],
    decoder: Optional[AsyncCustomDecoder[RabbitMessage]],
)

A class for fast producing messages using aio-pika.

METHOD DESCRIPTION
publish

Publishes a message to a queue or exchange.

_publish

Publishes a message to an exchange.

Note: This docstring is incomplete as it is difficult to understand the purpose and functionality of the class and its methods without more context.

Initialize a class instance.

PARAMETER DESCRIPTION
channel

The aio_pika.RobustChannel object.

TYPE: RobustChannel

declarer

The RabbitDeclarer object.

TYPE: RabbitDeclarer

parser

An optional AsyncCustomParser object for parsing incoming messages.

TYPE: Optional[AsyncCustomParser[IncomingMessage, RabbitMessage]]

decoder

An optional AsyncCustomDecoder object for decoding incoming messages.

TYPE: Optional[AsyncCustomDecoder[RabbitMessage]]

Source code in faststream/rabbit/producer.py
def __init__(
    self,
    channel: aio_pika.RobustChannel,
    declarer: RabbitDeclarer,
    parser: Optional[AsyncCustomParser[aio_pika.IncomingMessage, RabbitMessage]],
    decoder: Optional[AsyncCustomDecoder[RabbitMessage]],
) -> None:
    """Initialize a class instance.

    Args:
        channel: The aio_pika.RobustChannel object.
        declarer: The RabbitDeclarer object.
        parser: An optional AsyncCustomParser object for parsing incoming messages.
        decoder: An optional AsyncCustomDecoder object for decoding incoming messages.

    """
    self._channel = channel
    self.declarer = declarer
    self._parser = resolve_custom_func(parser, AioPikaParser.parse_message)
    self._decoder = resolve_custom_func(decoder, AioPikaParser.decode_message)
    self._rpc_lock = anyio.Lock()

declarer instance-attribute #

publish async #

publish(
    message: AioPikaSendableMessage = "",
    queue: Union[RabbitQueue, str] = "",
    exchange: Union[RabbitExchange, str, None] = None,
    *,
    routing_key: str = "",
    mandatory: bool = True,
    immediate: bool = False,
    timeout: TimeoutType = None,
    rpc: bool = False,
    rpc_timeout: Optional[float] = 30.0,
    raise_timeout: bool = False,
    persist: bool = False,
    reply_to: Optional[str] = None,
    **message_kwargs: Any
) -> Union[ConfirmationFrameType, Any]

Publish a message to a RabbitMQ queue.

PARAMETER DESCRIPTION
message

The message to be published.

TYPE: AioPikaSendableMessage DEFAULT: ''

queue

The queue to publish the message to.

TYPE: Union[RabbitQueue, str] DEFAULT: ''

exchange

The exchange to publish the message to.

TYPE: Union[RabbitExchange, str, None] DEFAULT: None

routing_key

The routing key for the message.

TYPE: str DEFAULT: ''

mandatory

Whether the message is mandatory.

TYPE: bool DEFAULT: True

immediate

Whether the message should be delivered immediately.

TYPE: bool DEFAULT: False

timeout

The timeout for the operation.

TYPE: TimeoutType DEFAULT: None

rpc

Whether the message is for RPC.

TYPE: bool DEFAULT: False

rpc_timeout

The timeout for RPC.

TYPE: Optional[float] DEFAULT: 30.0

raise_timeout

Whether to raise an exception on timeout.

TYPE: bool DEFAULT: False

persist

Whether the message should be persisted.

TYPE: bool DEFAULT: False

reply_to

The reply-to queue for RPC.

TYPE: Optional[str] DEFAULT: None

**message_kwargs

Additional keyword arguments for the message.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Union[ConfirmationFrameType, Any]

Union[aiormq.abc.ConfirmationFrameType, SendableMessage]: The result of the publish operation.

RAISES DESCRIPTION
WRONG_PUBLISH_ARGS

If reply_to is not None when rpc is True.

Source code in faststream/rabbit/producer.py
async def publish(
    self,
    message: AioPikaSendableMessage = "",
    queue: Union[RabbitQueue, str] = "",
    exchange: Union[RabbitExchange, str, None] = None,
    *,
    routing_key: str = "",
    mandatory: bool = True,
    immediate: bool = False,
    timeout: TimeoutType = None,
    rpc: bool = False,
    rpc_timeout: Optional[float] = 30.0,
    raise_timeout: bool = False,
    persist: bool = False,
    reply_to: Optional[str] = None,
    **message_kwargs: Any,
) -> Union[aiormq.abc.ConfirmationFrameType, Any]:
    """Publish a message to a RabbitMQ queue.

    Args:
        message (AioPikaSendableMessage): The message to be published.
        queue (Union[RabbitQueue, str]): The queue to publish the message to.
        exchange (Union[RabbitExchange, str, None]): The exchange to publish the message to.
        routing_key (str): The routing key for the message.
        mandatory (bool): Whether the message is mandatory.
        immediate (bool): Whether the message should be delivered immediately.
        timeout (TimeoutType): The timeout for the operation.
        rpc (bool): Whether the message is for RPC.
        rpc_timeout (Optional[float]): The timeout for RPC.
        raise_timeout (bool): Whether to raise an exception on timeout.
        persist (bool): Whether the message should be persisted.
        reply_to (Optional[str]): The reply-to queue for RPC.
        **message_kwargs (Any): Additional keyword arguments for the message.

    Returns:
        Union[aiormq.abc.ConfirmationFrameType, SendableMessage]: The result of the publish operation.

    Raises:
        WRONG_PUBLISH_ARGS: If reply_to is not None when rpc is True.

    """
    p_queue = RabbitQueue.validate(queue)

    context: AsyncContextManager[
        Optional[MemoryObjectReceiveStream[aio_pika.IncomingMessage]]
    ]
    if rpc is True:
        if reply_to is not None:
            raise WRONG_PUBLISH_ARGS
        else:
            context = _RPCCallback(
                self._rpc_lock,
                self.declarer.queues[RABBIT_REPLY],
            )
    else:
        context = fake_context()

    async with context as response_queue:
        r = await self._publish(
            message=message,
            exchange=exchange,
            routing_key=routing_key or p_queue.routing or "",
            mandatory=mandatory,
            immediate=immediate,
            timeout=timeout,
            persist=persist,
            reply_to=RABBIT_REPLY if response_queue else reply_to,
            **message_kwargs,
        )

        if response_queue is None:
            return r

        else:
            msg: Optional[aio_pika.IncomingMessage] = None
            with timeout_scope(rpc_timeout, raise_timeout):
                msg = await response_queue.receive()

            if msg:  # pragma: no branch
                return await self._decoder(await self._parser(msg))

    return None