Skip to content

RabbitRouter

faststream.rabbit.RabbitRouter #

RabbitRouter(
    prefix: str = "",
    handlers: Sequence[RabbitRoute] = (),
    *,
    dependencies: Sequence[Depends] = (),
    middlewares: Optional[
        Sequence[
            Callable[
                [aio_pika.IncomingMessage], BaseMiddleware
            ]
        ]
    ] = None,
    parser: Optional[
        CustomParser[
            aio_pika.IncomingMessage, RabbitMessage
        ]
    ] = None,
    decoder: Optional[CustomDecoder[RabbitMessage]] = None
)

Bases: RabbitRouter

A class representing a RabbitMQ router for publishing messages.

METHOD DESCRIPTION
_get_publisher_key

Returns the key for a given Publisher object

_update_publisher_prefix

Updates the prefix of a given Publisher object

publisher

Publishes a message to RabbitMQ

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Source code in faststream/rabbit/router.py
"""

_publishers: Dict[int, Publisher]

@staticmethod
def _get_publisher_key(publisher: Publisher) -> int:
    """Get the publisher key.

    Args:
        publisher: The publisher object.

    Returns:

prefix instance-attribute #

prefix: str = prefix

include_router #

include_router(
    router: BrokerRouter[PublisherKeyType, MsgType]
) -> None

Includes a router in the current object.

PARAMETER DESCRIPTION
router

The router to be included.

TYPE: BrokerRouter[PublisherKeyType, MsgType]

RETURNS DESCRIPTION
None

None

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Source code in faststream/broker/router.py
def include_router(self, router: "BrokerRouter[PublisherKeyType, MsgType]") -> None:
    """Includes a router in the current object.

    Args:
        router: The router to be included.

    Returns:
        None
    !!! note

        The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
    """
    for h in router._handlers:
        self.subscriber(*h.args, **h.kwargs)(h.call)

    for p in router._publishers.values():
        p = self._update_publisher_prefix(self.prefix, p)
        key = self._get_publisher_key(p)
        self._publishers[key] = self._publishers.get(key, p)

include_routers #

include_routers(
    *routers: BrokerRouter[PublisherKeyType, MsgType]
) -> None

Includes routers in the object.

PARAMETER DESCRIPTION
*routers

Variable length argument list of routers to include.

TYPE: BrokerRouter[PublisherKeyType, MsgType] DEFAULT: ()

RETURNS DESCRIPTION
None

None

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Source code in faststream/broker/router.py
def include_routers(
    self, *routers: "BrokerRouter[PublisherKeyType, MsgType]"
) -> None:
    """Includes routers in the object.

    Args:
        *routers: Variable length argument list of routers to include.

    Returns:
        None
    !!! note

        The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
    """
    for r in routers:
        self.include_router(r)

publisher #

publisher(
    queue: Union[RabbitQueue, str] = "",
    exchange: Union[RabbitExchange, str, None] = None,
    *,
    routing_key: str = "",
    mandatory: bool = True,
    immediate: bool = False,
    timeout: TimeoutType = None,
    persist: bool = False,
    reply_to: Optional[str] = None,
    title: Optional[str] = None,
    description: Optional[str] = None,
    schema: Optional[Any] = None,
    priority: Optional[int] = None,
    **message_kwargs: Any
) -> Publisher

Publishes a message to a RabbitMQ queue or exchange.

PARAMETER DESCRIPTION
queue

The RabbitMQ queue to publish the message to. Can be either a RabbitQueue object or a string representing the queue name.

TYPE: Union[RabbitQueue, str] DEFAULT: ''

exchange

The RabbitMQ exchange to publish the message to. Can be either a RabbitExchange object, a string representing the exchange name, or None.

TYPE: Union[RabbitExchange, str, None] DEFAULT: None

routing_key

The routing key to use when publishing the message.

TYPE: str DEFAULT: ''

mandatory

Whether the message is mandatory or not.

TYPE: bool DEFAULT: True

immediate

Whether the message should be delivered immediately or not.

TYPE: bool DEFAULT: False

timeout

The timeout for the publish operation.

TYPE: TimeoutType DEFAULT: None

persist

Whether the message should be persisted or not.

TYPE: bool DEFAULT: False

reply_to

The reply-to address for the message.

TYPE: Optional[str] DEFAULT: None

title

The title of the message (AsyncAPI information).

TYPE: Optional[str] DEFAULT: None

description

The description of the message (AsyncAPI information).

TYPE: Optional[str] DEFAULT: None

**message_kwargs

Additional keyword arguments to include in the message.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Publisher

The Publisher object used to publish the message.

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Source code in faststream/rabbit/router.py
@override
def publisher(  # type: ignore[override]
    self,
    queue: Union[RabbitQueue, str] = "",
    exchange: Union[RabbitExchange, str, None] = None,
    *,
    routing_key: str = "",
    mandatory: bool = True,
    immediate: bool = False,
    timeout: TimeoutType = None,
    persist: bool = False,
    reply_to: Optional[str] = None,
    # AsyncAPI information
    title: Optional[str] = None,
    description: Optional[str] = None,
    schema: Optional[Any] = None,
    priority: Optional[int] = None,
    **message_kwargs: Any,
) -> Publisher:
    """Publishes a message to a RabbitMQ queue or exchange.

    Args:
        queue: The RabbitMQ queue to publish the message to. Can be either a RabbitQueue object or a string representing the queue name.
        exchange: The RabbitMQ exchange to publish the message to. Can be either a RabbitExchange object, a string representing the exchange name, or None.
        routing_key: The routing key to use when publishing the message.
        mandatory: Whether the message is mandatory or not.
        immediate: Whether the message should be delivered immediately or not.
        timeout: The timeout for the publish operation.
        persist: Whether the message should be persisted or not.
        reply_to: The reply-to address for the message.
        title: The title of the message (AsyncAPI information).
        description: The description of the message (AsyncAPI information).
        **message_kwargs: Additional keyword arguments to include in the message.

    Returns:
        The Publisher object used to publish the message.
    !!! note

        The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
    """
    new_publisher = self._update_publisher_prefix(
        self.prefix,
        Publisher(
            queue=RabbitQueue.validate(queue),
            exchange=RabbitExchange.validate(exchange),
            routing_key=routing_key,
            mandatory=mandatory,
            immediate=immediate,
            timeout=timeout,
            persist=persist,
            reply_to=reply_to,
            priority=priority,
            message_kwargs=message_kwargs,
            title=title,
            _description=description,
            _schema=schema,
        ),
    )
    key = self._get_publisher_key(new_publisher)
    publisher = self._publishers[key] = self._publishers.get(key, new_publisher)
    return publisher

subscriber #

subscriber(
    queue: Union[str, RabbitQueue],
    exchange: Union[str, RabbitExchange, None] = None,
    *,
    consume_args: Optional[AnyDict] = None,
    dependencies: Sequence[Depends] = (),
    filter: Filter[RabbitMessage] = default_filter,
    parser: Optional[
        CustomParser[
            aio_pika.IncomingMessage, RabbitMessage
        ]
    ] = None,
    decoder: Optional[CustomDecoder[RabbitMessage]] = None,
    middlewares: Optional[
        Sequence[
            Callable[
                [aio_pika.IncomingMessage], BaseMiddleware
            ]
        ]
    ] = None,
    retry: Union[bool, int] = False,
    title: Optional[str] = None,
    description: Optional[str] = None,
    **__service_kwargs: Any
) -> Callable[
    [Callable[P_HandlerParams, T_HandlerReturn]],
    HandlerCallWrapper[
        aio_pika.IncomingMessage,
        P_HandlerParams,
        T_HandlerReturn,
    ],
]
Source code in faststream/rabbit/router.py
    """Updates the publisher prefix.

    Args:
        prefix (str): The prefix to be added to the publisher's queue name.
        publisher (Publisher): The publisher object to be updated.

    Returns:
        Publisher: The updated publisher object.

    Note:
        This function is intended to be used as a decorator.
    !!! note

        The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
    """
    publisher.queue = model_copy(
        publisher.queue, update={"name": prefix + publisher.queue.name}
    )
    return publisher

@override
def publisher(  # type: ignore[override]
    self,
    queue: Union[RabbitQueue, str] = "",
    exchange: Union[RabbitExchange, str, None] = None,
    *,
    routing_key: str = "",
    mandatory: bool = True,
    immediate: bool = False,

Last update: 2023-11-13