Skip to content

AioPikaFastProducer

faststream.rabbit.publisher.producer.AioPikaFastProducer #

AioPikaFastProducer(*, declarer, parser, decoder)

Bases: ProducerProto

A class for fast producing messages using aio-pika.

Source code in faststream/rabbit/publisher/producer.py
def __init__(
    self,
    *,
    declarer: "RabbitDeclarer",
    parser: Optional["CustomCallable"],
    decoder: Optional["CustomCallable"],
) -> None:
    self.declarer = declarer

    self._rpc_lock = anyio.Lock()

    default_parser = AioPikaParser()
    self._parser = resolve_custom_func(parser, default_parser.parse_message)
    self._decoder = resolve_custom_func(decoder, default_parser.decode_message)

declarer instance-attribute #

declarer = declarer

publish async #

publish(message, exchange=None, *, correlation_id='', routing_key='', mandatory=True, immediate=False, timeout=None, rpc=False, rpc_timeout=30.0, raise_timeout=False, persist=False, reply_to=None, headers=None, content_type=None, content_encoding=None, priority=None, expiration=None, message_id=None, timestamp=None, message_type=None, user_id=None, app_id=None)

Publish a message to a RabbitMQ queue.

Source code in faststream/rabbit/publisher/producer.py
@override
async def publish(  # type: ignore[override]
    self,
    message: "AioPikaSendableMessage",
    exchange: Union["RabbitExchange", str, None] = None,
    *,
    correlation_id: str = "",
    routing_key: str = "",
    mandatory: bool = True,
    immediate: bool = False,
    timeout: "TimeoutType" = None,
    rpc: bool = False,
    rpc_timeout: Optional[float] = 30.0,
    raise_timeout: bool = False,
    persist: bool = False,
    reply_to: Optional[str] = None,
    headers: Optional["HeadersType"] = None,
    content_type: Optional[str] = None,
    content_encoding: Optional[str] = None,
    priority: Optional[int] = None,
    expiration: Optional["DateType"] = None,
    message_id: Optional[str] = None,
    timestamp: Optional["DateType"] = None,
    message_type: Optional[str] = None,
    user_id: Optional[str] = None,
    app_id: Optional[str] = None,
) -> Optional[Any]:
    """Publish a message to a RabbitMQ queue."""
    context: AsyncContextManager[
        Optional[MemoryObjectReceiveStream[IncomingMessage]]
    ]
    if rpc:
        if reply_to is not None:
            raise WRONG_PUBLISH_ARGS

        context = _RPCCallback(
            self._rpc_lock,
            await self.declarer.declare_queue(RABBIT_REPLY),
        )
    else:
        context = fake_context()

    async with context as response_queue:
        r = await self._publish(
            message=message,
            exchange=exchange,
            routing_key=routing_key,
            mandatory=mandatory,
            immediate=immediate,
            timeout=timeout,
            persist=persist,
            reply_to=reply_to if response_queue is None else RABBIT_REPLY.name,
            headers=headers,
            content_type=content_type,
            content_encoding=content_encoding,
            priority=priority,
            correlation_id=correlation_id,
            expiration=expiration,
            message_id=message_id,
            timestamp=timestamp,
            message_type=message_type,
            user_id=user_id,
            app_id=app_id,
        )

        if response_queue is None:
            return r

        else:
            msg: Optional[IncomingMessage] = None
            with timeout_scope(rpc_timeout, raise_timeout):
                msg = await response_queue.receive()

            if msg:  # pragma: no branch
                return await self._decoder(await self._parser(msg))

    return None

request async #

request(message, exchange=None, *, correlation_id='', routing_key='', mandatory=True, immediate=False, timeout=None, persist=False, headers=None, content_type=None, content_encoding=None, priority=None, expiration=None, message_id=None, timestamp=None, message_type=None, user_id=None, app_id=None)

Publish a message to a RabbitMQ queue.

Source code in faststream/rabbit/publisher/producer.py
@override
async def request(  # type: ignore[override]
    self,
    message: "AioPikaSendableMessage",
    exchange: Union["RabbitExchange", str, None] = None,
    *,
    correlation_id: str = "",
    routing_key: str = "",
    mandatory: bool = True,
    immediate: bool = False,
    timeout: Optional[float] = None,
    persist: bool = False,
    headers: Optional["HeadersType"] = None,
    content_type: Optional[str] = None,
    content_encoding: Optional[str] = None,
    priority: Optional[int] = None,
    expiration: Optional["DateType"] = None,
    message_id: Optional[str] = None,
    timestamp: Optional["DateType"] = None,
    message_type: Optional[str] = None,
    user_id: Optional[str] = None,
    app_id: Optional[str] = None,
) -> "IncomingMessage":
    """Publish a message to a RabbitMQ queue."""
    async with _RPCCallback(
        self._rpc_lock,
        await self.declarer.declare_queue(RABBIT_REPLY),
    ) as response_queue:
        with anyio.fail_after(timeout):
            await self._publish(
                message=message,
                exchange=exchange,
                routing_key=routing_key,
                mandatory=mandatory,
                immediate=immediate,
                timeout=timeout,
                persist=persist,
                reply_to=RABBIT_REPLY.name,
                headers=headers,
                content_type=content_type,
                content_encoding=content_encoding,
                priority=priority,
                correlation_id=correlation_id,
                expiration=expiration,
                message_id=message_id,
                timestamp=timestamp,
                message_type=message_type,
                user_id=user_id,
                app_id=app_id,
            )
            return await response_queue.receive()