Skip to content

RabbitBroker

faststream.rabbit.broker.RabbitBroker #

RabbitBroker(
    url: Union[
        str, URL, None
    ] = "amqp://guest:guest@localhost:5672/",
    *,
    host: str = None,
    port: int = None,
    login: str = None,
    password: str = None,
    virtualhost: str = None,
    ssl_options: Optional[aio_pika.abc.SSLOptions] = None,
    client_properties: Optional[FieldTable] = None,
    max_consumers: Optional[int] = None,
    protocol: str = None,
    protocol_version: Optional[str] = "0.9.1",
    security: Optional[BaseSecurity] = None,
    **kwargs: Any
)

Bases: RabbitLoggingMixin, BrokerAsyncUsecase[IncomingMessage, RobustConnection]

A RabbitMQ broker for FastAPI applications.

This class extends the base BrokerAsyncUsecase and provides asynchronous support for RabbitMQ as a message broker.

PARAMETER DESCRIPTION
url

The RabbitMQ connection URL. Defaults to "amqp://guest:guest@localhost:5672/".

TYPE: Union[str, URL, None] DEFAULT: 'amqp://guest:guest@localhost:5672/'

max_consumers

Maximum number of consumers to limit message consumption. Defaults to None.

TYPE: Optional[int] DEFAULT: None

protocol

The protocol to use (e.g., "amqp"). Defaults to "amqp".

TYPE: str DEFAULT: None

protocol_version

The protocol version to use (e.g., "0.9.1"). Defaults to "0.9.1".

TYPE: Optional[str] DEFAULT: '0.9.1'

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

Initialize the RabbitBroker.

PARAMETER DESCRIPTION
url

The RabbitMQ connection URL. Defaults to "amqp://guest:guest@localhost:5672/".

TYPE: Union[str, URL, None] DEFAULT: 'amqp://guest:guest@localhost:5672/'

max_consumers

Maximum number of consumers to limit message consumption. Defaults to None.

TYPE: Optional[int] DEFAULT: None

protocol

The protocol to use (e.g., "amqp"). Defaults to "amqp".

TYPE: str DEFAULT: None

protocol_version

The protocol version to use (e.g., "0.9.1"). Defaults to "0.9.1".

TYPE: Optional[str] DEFAULT: '0.9.1'

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

Source code in faststream/rabbit/broker.py
def __init__(
    self,
    url: Union[str, URL, None] = "amqp://guest:guest@localhost:5672/",
    *,
    # connection args
    host: Optional[str] = None,
    port: Optional[int] = None,
    login: Optional[str] = None,
    password: Optional[str] = None,
    virtualhost: Optional[str] = None,
    ssl_options: Optional[SSLOptions] = None,
    client_properties: Optional[FieldTable] = None,
    # broker args
    max_consumers: Optional[int] = None,
    protocol: Optional[str] = None,
    protocol_version: Optional[str] = "0.9.1",
    security: Optional[BaseSecurity] = None,
    **kwargs: Any,
) -> None:
    """
    Initialize the RabbitBroker.

    Args:
        url (Union[str, URL, None], optional): The RabbitMQ connection URL. Defaults to "amqp://guest:guest@localhost:5672/".
        max_consumers (Optional[int], optional): Maximum number of consumers to limit message consumption. Defaults to None.
        protocol (str, optional): The protocol to use (e.g., "amqp"). Defaults to "amqp".
        protocol_version (Optional[str], optional): The protocol version to use (e.g., "0.9.1"). Defaults to "0.9.1".
        **kwargs: Additional keyword arguments.
    """
    security_args = parse_security(security)

    if (ssl := kwargs.get("ssl")) or kwargs.get("ssl_context"):  # pragma: no cover
        warnings.warn(
            (
                f"\nRabbitMQ {'`ssl`' if ssl else '`ssl_context`'} option was deprecated and will be removed in 0.4.0"
                "\nPlease, use `security` with `BaseSecurity` or `SASLPlaintext` instead"
            ),
            DeprecationWarning,
            stacklevel=2,
        )

    amqp_url = build_url(
        url,
        host=host,
        port=port,
        login=security_args.get("login", login),
        password=security_args.get("password", password),
        virtualhost=virtualhost,
        ssl=security_args.get("ssl", kwargs.pop("ssl", False)),
        ssl_options=ssl_options,
        client_properties=client_properties,
    )

    super().__init__(
        url=str(amqp_url),
        protocol=amqp_url.scheme,
        protocol_version=protocol_version,
        security=security,
        ssl_context=security_args.get(
            "ssl_context", kwargs.pop("ssl_context", None)
        ),
        **kwargs,
    )

    # respect ascynapi_url argument scheme
    asyncapi_url = build_url(self.url)
    self.protocol = protocol or asyncapi_url.scheme
    self.virtual_host = asyncapi_url.path

    self._max_consumers = max_consumers

    self._channel = None
    self.declarer = None
    self._producer = None

declarer instance-attribute #

declarer: Optional[RabbitDeclarer] = None

dependencies instance-attribute #

dependencies: Sequence[Depends] = dependencies

description instance-attribute #

description = description

fmt property #

fmt: str

handlers instance-attribute #

handlers: Dict[int, Handler]

log_level instance-attribute #

log_level: int

logger instance-attribute #

logger: Optional[logging.Logger]

middlewares instance-attribute #

middlewares: Sequence[Callable[[MsgType], BaseMiddleware]]

protocol instance-attribute #

protocol = protocol or asyncapi_url.scheme

protocol_version instance-attribute #

protocol_version = protocol_version

security instance-attribute #

security = security

started instance-attribute #

started: bool = False

tags instance-attribute #

tags = tags

url instance-attribute #

url = asyncapi_url or url

virtual_host instance-attribute #

virtual_host = asyncapi_url.path

close async #

close(
    exc_type: Optional[Type[BaseException]] = None,
    exc_val: Optional[BaseException] = None,
    exec_tb: Optional[TracebackType] = None,
) -> None

Closes the object.

PARAMETER DESCRIPTION
exc_type

The type of the exception being handled, if any.

TYPE: Optional[Type[BaseException]] DEFAULT: None

exc_val

The exception instance being handled, if any.

TYPE: Optional[BaseException] DEFAULT: None

exec_tb

The traceback of the exception being handled, if any.

TYPE: Optional[TracebackType] DEFAULT: None

RETURNS DESCRIPTION
None

None

RAISES DESCRIPTION
NotImplementedError

If the method is not implemented.

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Source code in faststream/broker/core/asyncronous.py
async def close(
    self,
    exc_type: Optional[Type[BaseException]] = None,
    exc_val: Optional[BaseException] = None,
    exec_tb: Optional[TracebackType] = None,
) -> None:
    """Closes the object.

    Args:
        exc_type: The type of the exception being handled, if any.
        exc_val: The exception instance being handled, if any.
        exec_tb: The traceback of the exception being handled, if any.

    Returns:
        None

    Raises:
        NotImplementedError: If the method is not implemented.
    !!! note

        The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
    """
    super()._abc_close(exc_type, exc_val, exec_tb)

    for h in self.handlers.values():
        await h.close()

    if self._connection is not None:
        await self._close(exc_type, exc_val, exec_tb)

connect async #

connect(
    *args: Any, **kwargs: Any
) -> aio_pika.RobustConnection

Connect to the RabbitMQ server.

PARAMETER DESCRIPTION
*args

Additional positional arguments.

TYPE: Any DEFAULT: ()

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
RobustConnection

aio_pika.RobustConnection: The RabbitMQ connection instance.

Source code in faststream/rabbit/broker.py
async def connect(self, *args: Any, **kwargs: Any) -> aio_pika.RobustConnection:
    """
    Connect to the RabbitMQ server.

    Args:
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Returns:
        aio_pika.RobustConnection: The RabbitMQ connection instance.
    """
    connection = await super().connect(*args, **kwargs)
    for p in self._publishers.values():
        p._producer = self._producer
    return connection

declare_exchange async #

declare_exchange(
    exchange: RabbitExchange,
) -> aio_pika.RobustExchange

Declare a RabbitMQ exchange.

PARAMETER DESCRIPTION
exchange

The RabbitMQ exchange to declare.

TYPE: RabbitExchange

RETURNS DESCRIPTION
RobustExchange

aio_pika.RobustExchange: The declared RabbitMQ exchange.

RAISES DESCRIPTION
RuntimeError

If the declarer is not initialized in the connect method.

Source code in faststream/rabbit/broker.py
async def declare_exchange(
    self,
    exchange: RabbitExchange,
) -> aio_pika.RobustExchange:
    """
    Declare a RabbitMQ exchange.

    Args:
        exchange (RabbitExchange): The RabbitMQ exchange to declare.

    Returns:
        aio_pika.RobustExchange: The declared RabbitMQ exchange.

    Raises:
        RuntimeError: If the declarer is not initialized in the `connect` method.
    """
    assert (  # nosec B101
        self.declarer
    ), "Declarer should be initialized in `connect` method"
    return await self.declarer.declare_exchange(exchange)

declare_queue async #

declare_queue(queue: RabbitQueue) -> aio_pika.RobustQueue

Declare a RabbitMQ queue.

PARAMETER DESCRIPTION
queue

The RabbitMQ queue to declare.

TYPE: RabbitQueue

RETURNS DESCRIPTION
RobustQueue

aio_pika.RobustQueue: The declared RabbitMQ queue.

RAISES DESCRIPTION
RuntimeError

If the declarer is not initialized in the connect method.

Source code in faststream/rabbit/broker.py
async def declare_queue(
    self,
    queue: RabbitQueue,
) -> aio_pika.RobustQueue:
    """
    Declare a RabbitMQ queue.

    Args:
        queue (RabbitQueue): The RabbitMQ queue to declare.

    Returns:
        aio_pika.RobustQueue: The declared RabbitMQ queue.

    Raises:
        RuntimeError: If the declarer is not initialized in the `connect` method.
    """
    assert (  # nosec B101
        self.declarer
    ), "Declarer should be initialized in `connect` method"
    return await self.declarer.declare_queue(queue)

include_router #

include_router(router: BrokerRouter[Any, MsgType]) -> None

Includes a router in the current object.

PARAMETER DESCRIPTION
router

The router to be included.

TYPE: BrokerRouter[Any, MsgType]

RETURNS DESCRIPTION
None

None

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Source code in faststream/broker/core/abc.py
def include_router(self, router: BrokerRouter[Any, MsgType]) -> None:
    """Includes a router in the current object.

    Args:
        router: The router to be included.

    Returns:
        None
    !!! note

        The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
    """
    for r in router._handlers:
        self.subscriber(*r.args, **r.kwargs)(r.call)

    self._publishers.update(router._publishers)

include_routers #

include_routers(
    *routers: BrokerRouter[Any, MsgType]
) -> None

Includes routers in the current object.

PARAMETER DESCRIPTION
*routers

Variable length argument list of routers to include.

TYPE: BrokerRouter[Any, MsgType] DEFAULT: ()

RETURNS DESCRIPTION
None

None

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Source code in faststream/broker/core/abc.py
def include_routers(self, *routers: BrokerRouter[Any, MsgType]) -> None:
    """Includes routers in the current object.

    Args:
        *routers: Variable length argument list of routers to include.

    Returns:
        None
    !!! note

        The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
    """
    for r in routers:
        self.include_router(r)

publish async #

publish(
    *args: Any, **kwargs: Any
) -> Union[
    aiormq.abc.ConfirmationFrameType, SendableMessage
]

Publish a message to the RabbitMQ broker.

PARAMETER DESCRIPTION
*args

Additional positional arguments.

TYPE: Any DEFAULT: ()

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Union[ConfirmationFrameType, SendableMessage]

Union[aiormq.abc.ConfirmationFrameType, SendableMessage]: The confirmation frame or the response message.

Source code in faststream/rabbit/broker.py
@override
async def publish(  # type: ignore[override]
    self,
    *args: Any,
    **kwargs: Any,
) -> Union[aiormq.abc.ConfirmationFrameType, SendableMessage]:
    """
    Publish a message to the RabbitMQ broker.

    Args:
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Returns:
        Union[aiormq.abc.ConfirmationFrameType, SendableMessage]: The confirmation frame or the response message.
    """

    assert self._producer, "RabbitBroker channel is not started yet"  # nosec B101
    return await self._producer.publish(*args, **kwargs)

publisher #

publisher(
    queue: Union[RabbitQueue, str] = "",
    exchange: Union[RabbitExchange, str, None] = None,
    *,
    routing_key: str = "",
    mandatory: bool = True,
    immediate: bool = False,
    timeout: TimeoutType = None,
    persist: bool = False,
    reply_to: Optional[str] = None,
    title: Optional[str] = None,
    description: Optional[str] = None,
    schema: Optional[Any] = None,
    priority: Optional[int] = None,
    **message_kwargs: Any
) -> Publisher

Define a message publisher.

PARAMETER DESCRIPTION
queue

The name of the RabbitMQ queue. Defaults to "".

TYPE: Union[RabbitQueue, str] DEFAULT: ''

exchange

The name of the RabbitMQ exchange. Defaults to None.

TYPE: Union[RabbitExchange, str, None] DEFAULT: None

routing_key

The routing key for messages. Defaults to "".

TYPE: str DEFAULT: ''

mandatory

Whether the message is mandatory. Defaults to True.

TYPE: bool DEFAULT: True

immediate

Whether the message should be sent immediately. Defaults to False.

TYPE: bool DEFAULT: False

timeout

Timeout for message publishing. Defaults to None.

TYPE: TimeoutType DEFAULT: None

persist

Whether to persist messages. Defaults to False.

TYPE: bool DEFAULT: False

reply_to

The reply-to queue name. Defaults to None.

TYPE: Optional[str] DEFAULT: None

title

Title for AsyncAPI docs.

TYPE: Optional[str] DEFAULT: None

description

Description for AsyncAPI docs.

TYPE: Optional[str] DEFAULT: None

**message_kwargs

Additional message properties and content.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Publisher

A message publisher instance.

TYPE: Publisher

Source code in faststream/rabbit/broker.py
@override
def publisher(  # type: ignore[override]
    self,
    queue: Union[RabbitQueue, str] = "",
    exchange: Union[RabbitExchange, str, None] = None,
    *,
    routing_key: str = "",
    mandatory: bool = True,
    immediate: bool = False,
    timeout: TimeoutType = None,
    persist: bool = False,
    reply_to: Optional[str] = None,
    # AsyncAPI information
    title: Optional[str] = None,
    description: Optional[str] = None,
    schema: Optional[Any] = None,
    priority: Optional[int] = None,
    **message_kwargs: Any,
) -> Publisher:
    """
    Define a message publisher.

    Args:
        queue (Union[RabbitQueue, str], optional): The name of the RabbitMQ queue. Defaults to "".
        exchange (Union[RabbitExchange, str, None], optional): The name of the RabbitMQ exchange. Defaults to None.
        routing_key (str, optional): The routing key for messages. Defaults to "".
        mandatory (bool, optional): Whether the message is mandatory. Defaults to True.
        immediate (bool, optional): Whether the message should be sent immediately. Defaults to False.
        timeout (TimeoutType, optional): Timeout for message publishing. Defaults to None.
        persist (bool, optional): Whether to persist messages. Defaults to False.
        reply_to (Optional[str], optional): The reply-to queue name. Defaults to None.
        title (Optional[str]): Title for AsyncAPI docs.
        description (Optional[str]): Description for AsyncAPI docs.
        **message_kwargs (Any): Additional message properties and content.

    Returns:
        Publisher: A message publisher instance.
    """
    q, ex = RabbitQueue.validate(queue), RabbitExchange.validate(exchange)

    publisher = Publisher(
        title=title,
        queue=q,
        exchange=ex,
        routing_key=routing_key,
        mandatory=mandatory,
        immediate=immediate,
        timeout=timeout,
        persist=persist,
        reply_to=reply_to,
        priority=priority,
        message_kwargs=message_kwargs,
        _description=description,
        _schema=schema,
        virtual_host=self.virtual_host,
    )

    key = publisher._get_routing_hash()
    publisher = self._publishers.get(key, publisher)
    super().publisher(key, publisher)
    if self._producer is not None:
        publisher._producer = self._producer
    return publisher

start async #

start() -> None

Start the RabbitMQ broker.

RAISES DESCRIPTION
RuntimeError

If the declarer is not initialized in the connect method.

Source code in faststream/rabbit/broker.py
async def start(self) -> None:
    """
    Start the RabbitMQ broker.

    Raises:
        RuntimeError: If the declarer is not initialized in the `connect` method.
    """
    context.set_local(
        "log_context",
        self._get_log_context(None, RabbitQueue(""), RabbitExchange("")),
    )

    await super().start()
    assert (  # nosec B101
        self.declarer
    ), "Declarer should be initialized in `connect` method"

    for publisher in self._publishers.values():
        if publisher.exchange is not None:
            await self.declare_exchange(publisher.exchange)

    for handler in self.handlers.values():
        c = self._get_log_context(None, handler.queue, handler.exchange)
        self._log(f"`{handler.call_name}` waiting for messages", extra=c)
        await handler.start(self.declarer)

subscriber #

subscriber(
    queue: Union[str, RabbitQueue],
    exchange: Union[str, RabbitExchange, None] = None,
    *,
    consume_args: Optional[AnyDict] = None,
    dependencies: Sequence[Depends] = (),
    parser: Optional[
        CustomParser[
            aio_pika.IncomingMessage, RabbitMessage
        ]
    ] = None,
    decoder: Optional[CustomDecoder[RabbitMessage]] = None,
    middlewares: Optional[
        Sequence[
            Callable[
                [aio_pika.IncomingMessage], BaseMiddleware
            ]
        ]
    ] = None,
    filter: Filter[RabbitMessage] = default_filter,
    title: Optional[str] = None,
    description: Optional[str] = None,
    **original_kwargs: Any
) -> Callable[
    [Callable[P_HandlerParams, T_HandlerReturn]],
    HandlerCallWrapper[
        aio_pika.IncomingMessage,
        P_HandlerParams,
        T_HandlerReturn,
    ],
]

Decorator to define a message subscriber.

PARAMETER DESCRIPTION
queue

The name of the RabbitMQ queue.

TYPE: Union[str, RabbitQueue]

exchange

The name of the RabbitMQ exchange. Defaults to None.

TYPE: Union[str, RabbitExchange, None] DEFAULT: None

consume_args

Additional arguments for message consumption.

TYPE: Optional[AnyDict] DEFAULT: None

title

Title for AsyncAPI docs.

TYPE: Optional[str] DEFAULT: None

description

Description for AsyncAPI docs.

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
Callable

A decorator function for defining message subscribers.

TYPE: Callable[[Callable[P_HandlerParams, T_HandlerReturn]], HandlerCallWrapper[IncomingMessage, P_HandlerParams, T_HandlerReturn]]

Source code in faststream/rabbit/broker.py
@override
def subscriber(  # type: ignore[override]
    self,
    queue: Union[str, RabbitQueue],
    exchange: Union[str, RabbitExchange, None] = None,
    *,
    consume_args: Optional[AnyDict] = None,
    # broker arguments
    dependencies: Sequence[Depends] = (),
    parser: Optional[CustomParser[aio_pika.IncomingMessage, RabbitMessage]] = None,
    decoder: Optional[CustomDecoder[RabbitMessage]] = None,
    middlewares: Optional[
        Sequence[Callable[[aio_pika.IncomingMessage], BaseMiddleware]]
    ] = None,
    filter: Filter[RabbitMessage] = default_filter,
    # AsyncAPI information
    title: Optional[str] = None,
    description: Optional[str] = None,
    **original_kwargs: Any,
) -> Callable[
    [Callable[P_HandlerParams, T_HandlerReturn]],
    HandlerCallWrapper[aio_pika.IncomingMessage, P_HandlerParams, T_HandlerReturn],
]:
    """
    Decorator to define a message subscriber.

    Args:
        queue (Union[str, RabbitQueue]): The name of the RabbitMQ queue.
        exchange (Union[str, RabbitExchange, None], optional): The name of the RabbitMQ exchange. Defaults to None.
        consume_args (Optional[AnyDict], optional): Additional arguments for message consumption.
        title (Optional[str]): Title for AsyncAPI docs.
        description (Optional[str]): Description for AsyncAPI docs.

    Returns:
        Callable: A decorator function for defining message subscribers.
    """
    super().subscriber()

    r_queue = RabbitQueue.validate(queue)
    r_exchange = RabbitExchange.validate(exchange)

    self._setup_log_context(r_queue, r_exchange)

    key = get_routing_hash(r_queue, r_exchange)
    handler = self.handlers.get(
        key,
        Handler(
            log_context_builder=partial(
                self._get_log_context, queue=r_queue, exchange=r_exchange
            ),
            queue=r_queue,
            exchange=r_exchange,
            consume_args=consume_args,
            description=description,
            title=title,
            virtual_host=self.virtual_host,
        ),
    )

    self.handlers[key] = handler

    def consumer_wrapper(
        func: Callable[P_HandlerParams, T_HandlerReturn],
    ) -> HandlerCallWrapper[
        aio_pika.IncomingMessage, P_HandlerParams, T_HandlerReturn
    ]:
        """Wraps a consumer function with additional functionality.

        Args:
            func: The consumer function to be wrapped.

        Returns:
            The wrapped consumer function.

        Raises:
            NotImplementedError: If silent animals are not supported.
        !!! note

            The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
        """
        handler_call, dependant = self._wrap_handler(
            func,
            extra_dependencies=dependencies,
            **original_kwargs,
        )

        handler.add_call(
            handler=handler_call,
            filter=filter,
            middlewares=middlewares,
            parser=parser or self._global_parser,
            decoder=decoder or self._global_decoder,
            dependant=dependant,
        )

        return handler_call

    return consumer_wrapper

Last update: 2023-11-13