Skip to content

AioPikaFastProducer

faststream.rabbit.producer.AioPikaFastProducer #

AioPikaFastProducer(
    channel: aio_pika.RobustChannel,
    declarer: RabbitDeclarer,
    parser: Optional[
        AsyncCustomParser[
            aio_pika.IncomingMessage, RabbitMessage
        ]
    ],
    decoder: Optional[AsyncCustomDecoder[RabbitMessage]],
)

A class for fast producing messages using aio-pika.

METHOD DESCRIPTION
publish

Publishes a message to a queue or exchange.

_publish

Publishes a message to an exchange.

Note: This docstring is incomplete as it is difficult to understand the purpose and functionality of the class and its methods without more context.

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Initialize a class instance.

PARAMETER DESCRIPTION
channel

The aio_pika.RobustChannel object.

TYPE: RobustChannel

declarer

The RabbitDeclarer object.

TYPE: RabbitDeclarer

parser

An optional AsyncCustomParser object for parsing incoming messages.

TYPE: Optional[AsyncCustomParser[IncomingMessage, RabbitMessage]]

decoder

An optional AsyncCustomDecoder object for decoding incoming messages.

TYPE: Optional[AsyncCustomDecoder[RabbitMessage]]

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Source code in faststream/rabbit/producer.py
def __init__(
    self,
    channel: aio_pika.RobustChannel,
    declarer: RabbitDeclarer,
    parser: Optional[AsyncCustomParser[aio_pika.IncomingMessage, RabbitMessage]],
    decoder: Optional[AsyncCustomDecoder[RabbitMessage]],
):
    """Initialize a class instance.

    Args:
        channel: The aio_pika.RobustChannel object.
        declarer: The RabbitDeclarer object.
        parser: An optional AsyncCustomParser object for parsing incoming messages.
        decoder: An optional AsyncCustomDecoder object for decoding incoming messages.
    !!! note

        The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
    """
    self._channel = channel
    self.declarer = declarer
    self._parser = resolve_custom_func(parser, AioPikaParser.parse_message)
    self._decoder = resolve_custom_func(decoder, AioPikaParser.decode_message)
    self._rpc_lock = anyio.Lock()

declarer instance-attribute #

declarer: RabbitDeclarer = declarer

publish async #

publish(
    message: AioPikaSendableMessage = "",
    queue: Union[RabbitQueue, str] = "",
    exchange: Union[RabbitExchange, str, None] = None,
    *,
    routing_key: str = "",
    mandatory: bool = True,
    immediate: bool = False,
    timeout: TimeoutType = None,
    rpc: bool = False,
    rpc_timeout: Optional[float] = 30.0,
    raise_timeout: bool = False,
    persist: bool = False,
    reply_to: Optional[str] = None,
    **message_kwargs: Any
) -> Union[
    aiormq.abc.ConfirmationFrameType, SendableMessage
]

Publish a message to a RabbitMQ queue.

PARAMETER DESCRIPTION
message

The message to be published.

TYPE: AioPikaSendableMessage DEFAULT: ''

queue

The queue to publish the message to.

TYPE: Union[RabbitQueue, str] DEFAULT: ''

exchange

The exchange to publish the message to.

TYPE: Union[RabbitExchange, str, None] DEFAULT: None

routing_key

The routing key for the message.

TYPE: str DEFAULT: ''

mandatory

Whether the message is mandatory.

TYPE: bool DEFAULT: True

immediate

Whether the message should be delivered immediately.

TYPE: bool DEFAULT: False

timeout

The timeout for the operation.

TYPE: TimeoutType DEFAULT: None

rpc

Whether the message is for RPC.

TYPE: bool DEFAULT: False

rpc_timeout

The timeout for RPC.

TYPE: Optional[float] DEFAULT: 30.0

raise_timeout

Whether to raise an exception on timeout.

TYPE: bool DEFAULT: False

persist

Whether the message should be persisted.

TYPE: bool DEFAULT: False

reply_to

The reply-to queue for RPC.

TYPE: Optional[str] DEFAULT: None

**message_kwargs

Additional keyword arguments for the message.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Union[ConfirmationFrameType, SendableMessage]

Union[aiormq.abc.ConfirmationFrameType, SendableMessage]: The result of the publish operation.

RAISES DESCRIPTION
WRONG_PUBLISH_ARGS

If reply_to is not None when rpc is True.

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Source code in faststream/rabbit/producer.py
async def publish(
    self,
    message: AioPikaSendableMessage = "",
    queue: Union[RabbitQueue, str] = "",
    exchange: Union[RabbitExchange, str, None] = None,
    *,
    routing_key: str = "",
    mandatory: bool = True,
    immediate: bool = False,
    timeout: TimeoutType = None,
    rpc: bool = False,
    rpc_timeout: Optional[float] = 30.0,
    raise_timeout: bool = False,
    persist: bool = False,
    reply_to: Optional[str] = None,
    **message_kwargs: Any,
) -> Union[aiormq.abc.ConfirmationFrameType, SendableMessage]:
    """Publish a message to a RabbitMQ queue.

    Args:
        message (AioPikaSendableMessage): The message to be published.
        queue (Union[RabbitQueue, str]): The queue to publish the message to.
        exchange (Union[RabbitExchange, str, None]): The exchange to publish the message to.
        routing_key (str): The routing key for the message.
        mandatory (bool): Whether the message is mandatory.
        immediate (bool): Whether the message should be delivered immediately.
        timeout (TimeoutType): The timeout for the operation.
        rpc (bool): Whether the message is for RPC.
        rpc_timeout (Optional[float]): The timeout for RPC.
        raise_timeout (bool): Whether to raise an exception on timeout.
        persist (bool): Whether the message should be persisted.
        reply_to (Optional[str]): The reply-to queue for RPC.
        **message_kwargs (Any): Additional keyword arguments for the message.

    Returns:
        Union[aiormq.abc.ConfirmationFrameType, SendableMessage]: The result of the publish operation.

    Raises:
        WRONG_PUBLISH_ARGS: If reply_to is not None when rpc is True.
    !!! note

        The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
    """
    p_queue = RabbitQueue.validate(queue)

    context: AsyncContextManager[
        Optional[MemoryObjectReceiveStream[aio_pika.IncomingMessage]]
    ]
    if rpc is True:
        if reply_to is not None:
            raise WRONG_PUBLISH_ARGS
        else:
            context = _RPCCallback(
                self._rpc_lock,
                self.declarer.queues[RABBIT_REPLY],
            )
    else:
        context = _fake_context()

    async with context as response_queue:
        r = await self._publish(
            message=message,
            exchange=exchange,
            routing_key=routing_key or p_queue.routing or "",
            mandatory=mandatory,
            immediate=immediate,
            timeout=timeout,
            persist=persist,
            reply_to=RABBIT_REPLY if response_queue else reply_to,
            **message_kwargs,
        )

        if response_queue is None:
            return r

        else:
            msg: Optional[aio_pika.IncomingMessage] = None
            with timeout_scope(rpc_timeout, raise_timeout):
                msg = await response_queue.receive()

            if msg:  # pragma: no branch
                return await self._decoder(await self._parser(msg))

    return None

Last update: 2023-11-13