Skip to content

RabbitBroker

faststream.rabbit.broker.RabbitBroker #

RabbitBroker(
    url: str
    | URL
    | None = "amqp://guest:guest@localhost:5672/",
    *,
    host: str = None,
    port: int = None,
    login: str = None,
    password: str = None,
    virtualhost: str = None,
    ssl_options: SSLOptions | None = None,
    client_properties: FieldTable | None = None,
    max_consumers: int | None = None,
    protocol: str = None,
    protocol_version: str | None = "0.9.1",
    security: BaseSecurity | None = None,
    **kwargs: Any
)

Bases: RabbitLoggingMixin, BrokerAsyncUsecase[IncomingMessage, RobustConnection]

A RabbitMQ broker for FastAPI applications.

This class extends the base BrokerAsyncUsecase and provides asynchronous support for RabbitMQ as a message broker.

PARAMETER DESCRIPTION
url

The RabbitMQ connection URL. Defaults to "amqp://guest:guest@localhost:5672/".

TYPE: Union[str, URL, None] DEFAULT: 'amqp://guest:guest@localhost:5672/'

max_consumers

Maximum number of consumers to limit message consumption. Defaults to None.

TYPE: Optional[int] DEFAULT: None

protocol

The protocol to use (e.g., "amqp"). Defaults to "amqp".

TYPE: str DEFAULT: None

protocol_version

The protocol version to use (e.g., "0.9.1"). Defaults to "0.9.1".

TYPE: Optional[str] DEFAULT: '0.9.1'

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

Initialize the RabbitBroker.

PARAMETER DESCRIPTION
url

The RabbitMQ connection URL. Defaults to "amqp://guest:guest@localhost:5672/".

TYPE: Union[str, URL, None] DEFAULT: 'amqp://guest:guest@localhost:5672/'

host

The RabbitMQ host. Defaults to None.

TYPE: Optional[str] DEFAULT: None

port

The RabbitMQ port. Defaults to None.

TYPE: Optional[int] DEFAULT: None

login

The RabbitMQ login. Defaults to None.

TYPE: Optional[str] DEFAULT: None

password

The RabbitMQ password. Defaults to None.

TYPE: Optional[str] DEFAULT: None

virtualhost

The RabbitMQ virtual host. Defaults to None.

TYPE: Optional[str] DEFAULT: None

ssl_options

The RabbitMQ SSL options. Defaults to None.

TYPE: Optional[SSLOptions] DEFAULT: None

client_properties

The RabbitMQ client properties. Defaults to None.

TYPE: Optional[FieldTable] DEFAULT: None

max_consumers

Maximum number of consumers to limit message consumption. Defaults to None.

TYPE: Optional[int] DEFAULT: None

protocol

The protocol to use (e.g., "amqp"). Defaults to "amqp".

TYPE: str DEFAULT: None

protocol_version

The protocol version to use (e.g., "0.9.1"). Defaults to "0.9.1".

TYPE: Optional[str] DEFAULT: '0.9.1'

security

The security mechanism to use. Defaults to None.

TYPE: Optional[BaseSecurity] DEFAULT: None

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

Source code in faststream/rabbit/broker.py
def __init__(
    self,
    url: Union[str, URL, None] = "amqp://guest:guest@localhost:5672/",
    *,
    # connection args
    host: Optional[str] = None,
    port: Optional[int] = None,
    login: Optional[str] = None,
    password: Optional[str] = None,
    virtualhost: Optional[str] = None,
    ssl_options: Optional[SSLOptions] = None,
    client_properties: Optional[FieldTable] = None,
    # broker args
    max_consumers: Optional[int] = None,
    protocol: Optional[str] = None,
    protocol_version: Optional[str] = "0.9.1",
    security: Optional[BaseSecurity] = None,
    **kwargs: Any,
) -> None:
    """Initialize the RabbitBroker.

    Args:
        url (Union[str, URL, None], optional): The RabbitMQ connection URL. Defaults to "amqp://guest:guest@localhost:5672/".
        host (Optional[str], optional): The RabbitMQ host. Defaults to None.
        port (Optional[int], optional): The RabbitMQ port. Defaults to None.
        login (Optional[str], optional): The RabbitMQ login. Defaults to None.
        password (Optional[str], optional): The RabbitMQ password. Defaults to None.
        virtualhost (Optional[str], optional): The RabbitMQ virtual host. Defaults to None.
        ssl_options (Optional[SSLOptions], optional): The RabbitMQ SSL options. Defaults to None.
        client_properties (Optional[FieldTable], optional): The RabbitMQ client properties. Defaults to None.
        max_consumers (Optional[int], optional): Maximum number of consumers to limit message consumption. Defaults to None.
        protocol (str, optional): The protocol to use (e.g., "amqp"). Defaults to "amqp".
        protocol_version (Optional[str], optional): The protocol version to use (e.g., "0.9.1"). Defaults to "0.9.1".
        security (Optional[BaseSecurity], optional): The security mechanism to use. Defaults to None.
        **kwargs: Additional keyword arguments.
    """
    security_args = parse_security(security)

    if (ssl := kwargs.get("ssl")) or kwargs.get("ssl_context"):  # pragma: no cover
        warnings.warn(
            (
                f"\nRabbitMQ {'`ssl`' if ssl else '`ssl_context`'} option was deprecated and will be removed in 0.4.0"
                "\nPlease, use `security` with `BaseSecurity` or `SASLPlaintext` instead"
            ),
            DeprecationWarning,
            stacklevel=2,
        )

    amqp_url = build_url(
        url,
        host=host,
        port=port,
        login=security_args.get("login", login),
        password=security_args.get("password", password),
        virtualhost=virtualhost,
        ssl=security_args.get("ssl", kwargs.pop("ssl", False)),
        ssl_options=ssl_options,
        client_properties=client_properties,
    )

    super().__init__(
        url=str(amqp_url),
        protocol_version=protocol_version,
        security=security,
        ssl_context=security_args.get(
            "ssl_context",
            kwargs.pop("ssl_context", None),
        ),
        **kwargs,
    )

    # respect ascynapi_url argument scheme
    asyncapi_url = build_url(self.url)
    self.protocol = protocol or asyncapi_url.scheme
    self.virtual_host = asyncapi_url.path

    self._max_consumers = max_consumers

    self._channel = None
    self.declarer = None
    self._producer = None

declarer instance-attribute #

declarer: RabbitDeclarer | None = None

dependencies instance-attribute #

dependencies: Sequence[Depends] = dependencies

description instance-attribute #

description = description

fmt property #

fmt: str

graceful_timeout instance-attribute #

graceful_timeout = graceful_timeout

handlers instance-attribute #

handlers: dict[int, Handler]

log_level instance-attribute #

log_level: int

logger instance-attribute #

logger: Optional[Logger]

middlewares instance-attribute #

middlewares: Sequence[Callable[[MsgType], BaseMiddleware]]

protocol instance-attribute #

protocol = protocol or scheme

protocol_version instance-attribute #

protocol_version = protocol_version

security instance-attribute #

security = security

started instance-attribute #

started: bool = False

tags instance-attribute #

tags = tags

url instance-attribute #

url: str

virtual_host instance-attribute #

virtual_host = path

close async #

close(
    exc_type: Optional[Type[BaseException]] = None,
    exc_val: Optional[BaseException] = None,
    exec_tb: Optional[TracebackType] = None,
) -> None

Closes the object.

PARAMETER DESCRIPTION
exc_type

The type of the exception being handled, if any.

TYPE: Optional[Type[BaseException]] DEFAULT: None

exc_val

The exception instance being handled, if any.

TYPE: Optional[BaseException] DEFAULT: None

exec_tb

The traceback of the exception being handled, if any.

TYPE: Optional[TracebackType] DEFAULT: None

RETURNS DESCRIPTION
None

None

RAISES DESCRIPTION
NotImplementedError

If the method is not implemented.

Source code in faststream/broker/core/asynchronous.py
async def close(
    self,
    exc_type: Optional[Type[BaseException]] = None,
    exc_val: Optional[BaseException] = None,
    exec_tb: Optional[TracebackType] = None,
) -> None:
    """Closes the object.

    Args:
        exc_type: The type of the exception being handled, if any.
        exc_val: The exception instance being handled, if any.
        exec_tb: The traceback of the exception being handled, if any.

    Returns:
        None

    Raises:
        NotImplementedError: If the method is not implemented.

    """
    super()._abc_close(exc_type, exc_val, exec_tb)

    for h in self.handlers.values():
        await h.close()

    if self._connection is not None:
        await self._close(exc_type, exc_val, exec_tb)

connect async #

connect(*args: Any, **kwargs: Any) -> RobustConnection

Connect to the RabbitMQ server.

PARAMETER DESCRIPTION
*args

Additional positional arguments.

TYPE: Any DEFAULT: ()

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
RobustConnection

aio_pika.RobustConnection: The RabbitMQ connection instance.

Source code in faststream/rabbit/broker.py
async def connect(self, *args: Any, **kwargs: Any) -> aio_pika.RobustConnection:
    """Connect to the RabbitMQ server.

    Args:
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Returns:
        aio_pika.RobustConnection: The RabbitMQ connection instance.
    """
    connection = await super().connect(*args, **kwargs)
    for p in self._publishers.values():
        p._producer = self._producer
    return connection

declare_exchange async #

declare_exchange(
    exchange: RabbitExchange,
) -> RobustExchange

Declare a RabbitMQ exchange.

PARAMETER DESCRIPTION
exchange

The RabbitMQ exchange to declare.

TYPE: RabbitExchange

RETURNS DESCRIPTION
RobustExchange

aio_pika.RobustExchange: The declared RabbitMQ exchange.

RAISES DESCRIPTION
RuntimeError

If the declarer is not initialized in the connect method.

Source code in faststream/rabbit/broker.py
async def declare_exchange(
    self,
    exchange: RabbitExchange,
) -> aio_pika.RobustExchange:
    """Declare a RabbitMQ exchange.

    Args:
        exchange (RabbitExchange): The RabbitMQ exchange to declare.

    Returns:
        aio_pika.RobustExchange: The declared RabbitMQ exchange.

    Raises:
        RuntimeError: If the declarer is not initialized in the `connect` method.
    """
    assert self.declarer, NOT_CONNECTED_YET  # nosec B101
    return await self.declarer.declare_exchange(exchange)

declare_queue async #

declare_queue(queue: RabbitQueue) -> RobustQueue

Declare a RabbitMQ queue.

PARAMETER DESCRIPTION
queue

The RabbitMQ queue to declare.

TYPE: RabbitQueue

RETURNS DESCRIPTION
RobustQueue

aio_pika.RobustQueue: The declared RabbitMQ queue.

RAISES DESCRIPTION
RuntimeError

If the declarer is not initialized in the connect method.

Source code in faststream/rabbit/broker.py
async def declare_queue(
    self,
    queue: RabbitQueue,
) -> aio_pika.RobustQueue:
    """Declare a RabbitMQ queue.

    Args:
        queue (RabbitQueue): The RabbitMQ queue to declare.

    Returns:
        aio_pika.RobustQueue: The declared RabbitMQ queue.

    Raises:
        RuntimeError: If the declarer is not initialized in the `connect` method.
    """
    assert self.declarer, NOT_CONNECTED_YET  # nosec B101
    return await self.declarer.declare_queue(queue)

include_router #

include_router(router: BrokerRouter[Any, MsgType]) -> None

Includes a router in the current object.

PARAMETER DESCRIPTION
router

The router to be included.

TYPE: BrokerRouter[Any, MsgType]

RETURNS DESCRIPTION
None

None

Source code in faststream/broker/core/abc.py
def include_router(self, router: BrokerRouter[Any, MsgType]) -> None:
    """Includes a router in the current object.

    Args:
        router: The router to be included.

    Returns:
        None

    """
    for r in router._handlers:
        self.subscriber(*r.args, **r.kwargs)(r.call)

    self._publishers = {**self._publishers, **router._publishers}

include_routers #

include_routers(
    *routers: BrokerRouter[Any, MsgType]
) -> None

Includes routers in the current object.

PARAMETER DESCRIPTION
*routers

Variable length argument list of routers to include.

TYPE: BrokerRouter[Any, MsgType] DEFAULT: ()

RETURNS DESCRIPTION
None

None

Source code in faststream/broker/core/abc.py
def include_routers(self, *routers: BrokerRouter[Any, MsgType]) -> None:
    """Includes routers in the current object.

    Args:
        *routers: Variable length argument list of routers to include.

    Returns:
        None

    """
    for r in routers:
        self.include_router(r)

publish async #

publish(
    *args: Any, **kwargs: Any
) -> ConfirmationFrameType | SendableMessage

Publish a message to the RabbitMQ broker.

PARAMETER DESCRIPTION
*args

Additional positional arguments.

TYPE: Any DEFAULT: ()

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
ConfirmationFrameType | SendableMessage

Union[aiormq.abc.ConfirmationFrameType, SendableMessage]: The confirmation frame or the response message.

Source code in faststream/rabbit/broker.py
@override
async def publish(  # type: ignore[override]
    self,
    *args: Any,
    **kwargs: Any,
) -> Union[aiormq.abc.ConfirmationFrameType, SendableMessage]:
    """Publish a message to the RabbitMQ broker.

    Args:
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Returns:
        Union[aiormq.abc.ConfirmationFrameType, SendableMessage]: The confirmation frame or the response message.
    """
    assert self._producer, NOT_CONNECTED_YET  # nosec B101
    return await self._producer.publish(*args, **kwargs)

publisher #

publisher(
    queue: RabbitQueue | str = "",
    exchange: RabbitExchange | str | None = None,
    *,
    routing_key: str = "",
    mandatory: bool = True,
    immediate: bool = False,
    timeout: TimeoutType = None,
    persist: bool = False,
    reply_to: str | None = None,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
    priority: int | None = None,
    **message_kwargs: Any
) -> Publisher

Define a message publisher.

PARAMETER DESCRIPTION
queue

The name of the RabbitMQ queue. Defaults to "".

TYPE: Union[RabbitQueue, str] DEFAULT: ''

exchange

The name of the RabbitMQ exchange. Defaults to None.

TYPE: Union[RabbitExchange, str, None] DEFAULT: None

routing_key

The routing key for messages. Defaults to "".

TYPE: str DEFAULT: ''

mandatory

Whether the message is mandatory. Defaults to True.

TYPE: bool DEFAULT: True

immediate

Whether the message should be sent immediately. Defaults to False.

TYPE: bool DEFAULT: False

timeout

Timeout for message publishing. Defaults to None.

TYPE: TimeoutType DEFAULT: None

persist

Whether to persist messages. Defaults to False.

TYPE: bool DEFAULT: False

reply_to

The reply-to queue name. Defaults to None.

TYPE: Optional[str] DEFAULT: None

title

Title for AsyncAPI docs.

TYPE: Optional[str] DEFAULT: None

description

Description for AsyncAPI docs.

TYPE: Optional[str] DEFAULT: None

schema

Schema for AsyncAPI docs.

TYPE: Optional[Any] DEFAULT: None

include_in_schema

Whether to include the publisher in AsyncAPI docs.

TYPE: bool DEFAULT: True

priority

Priority for the message.

TYPE: Optional[int] DEFAULT: None

**message_kwargs

Additional message properties and content.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Publisher

A message publisher instance.

TYPE: Publisher

Source code in faststream/rabbit/broker.py
@override
def publisher(  # type: ignore[override]
    self,
    queue: Union[RabbitQueue, str] = "",
    exchange: Union[RabbitExchange, str, None] = None,
    *,
    routing_key: str = "",
    mandatory: bool = True,
    immediate: bool = False,
    timeout: TimeoutType = None,
    persist: bool = False,
    reply_to: Optional[str] = None,
    # AsyncAPI information
    title: Optional[str] = None,
    description: Optional[str] = None,
    schema: Optional[Any] = None,
    include_in_schema: bool = True,
    priority: Optional[int] = None,
    **message_kwargs: Any,
) -> Publisher:
    """Define a message publisher.

    Args:
        queue (Union[RabbitQueue, str], optional): The name of the RabbitMQ queue. Defaults to "".
        exchange (Union[RabbitExchange, str, None], optional): The name of the RabbitMQ exchange. Defaults to None.
        routing_key (str, optional): The routing key for messages. Defaults to "".
        mandatory (bool, optional): Whether the message is mandatory. Defaults to True.
        immediate (bool, optional): Whether the message should be sent immediately. Defaults to False.
        timeout (TimeoutType, optional): Timeout for message publishing. Defaults to None.
        persist (bool, optional): Whether to persist messages. Defaults to False.
        reply_to (Optional[str], optional): The reply-to queue name. Defaults to None.
        title (Optional[str]): Title for AsyncAPI docs.
        description (Optional[str]): Description for AsyncAPI docs.
        schema (Optional[Any]): Schema for AsyncAPI docs.
        include_in_schema (bool): Whether to include the publisher in AsyncAPI docs.
        priority (Optional[int]): Priority for the message.
        **message_kwargs (Any): Additional message properties and content.

    Returns:
        Publisher: A message publisher instance.
    """
    q, ex = RabbitQueue.validate(queue), RabbitExchange.validate(exchange)

    publisher = Publisher(
        title=title,
        queue=q,
        exchange=ex,
        routing_key=routing_key,
        mandatory=mandatory,
        immediate=immediate,
        timeout=timeout,
        persist=persist,
        reply_to=reply_to,
        priority=priority,
        message_kwargs=message_kwargs,
        _description=description,
        _schema=schema,
        virtual_host=self.virtual_host,
        include_in_schema=include_in_schema,
    )

    key = publisher._get_routing_hash()
    publisher = self._publishers.get(key, publisher)
    super().publisher(key, publisher)
    if self._producer is not None:
        publisher._producer = self._producer
    return publisher

start async #

start() -> None

Start the RabbitMQ broker.

RAISES DESCRIPTION
RuntimeError

If the declarer is not initialized in the connect method.

Source code in faststream/rabbit/broker.py
async def start(self) -> None:
    """Start the RabbitMQ broker.

    Raises:
        RuntimeError: If the declarer is not initialized in the `connect` method.
    """
    context.set_global(
        "default_log_context",
        self._get_log_context(None, RabbitQueue(""), RabbitExchange("")),
    )

    await super().start()
    assert self.declarer, NOT_CONNECTED_YET  # nosec B101

    for publisher in self._publishers.values():
        if publisher.exchange is not None:
            await self.declare_exchange(publisher.exchange)

    for handler in self.handlers.values():
        c = self._get_log_context(None, handler.queue, handler.exchange)
        self._log(f"`{handler.call_name}` waiting for messages", extra=c)
        await handler.start(self.declarer)

subscriber #

subscriber(
    queue: str | RabbitQueue,
    exchange: str | RabbitExchange | None = None,
    *,
    consume_args: AnyDict | None = None,
    reply_config: ReplyConfig | None = None,
    dependencies: Sequence[Depends] = (),
    parser: CustomParser[IncomingMessage, RabbitMessage]
    | None = None,
    decoder: CustomDecoder[RabbitMessage] | None = None,
    middlewares: Sequence[
        Callable[[IncomingMessage], BaseMiddleware]
    ]
    | None = None,
    filter: Filter[RabbitMessage] = default_filter,
    no_ack: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    **original_kwargs: Any
) -> Callable[
    [Callable[P_HandlerParams, T_HandlerReturn]],
    HandlerCallWrapper[
        IncomingMessage, P_HandlerParams, T_HandlerReturn
    ],
]

Decorator to define a message subscriber.

PARAMETER DESCRIPTION
queue

The name of the RabbitMQ queue.

TYPE: Union[str, RabbitQueue]

exchange

The name of the RabbitMQ exchange. Defaults to None.

TYPE: Union[str, RabbitExchange, None] DEFAULT: None

consume_args

Additional arguments for message consumption.

TYPE: Optional[AnyDict] DEFAULT: None

reply_config

The reply configuration for the message.

TYPE: Optional[ReplyConfig] DEFAULT: None

dependencies

Additional dependencies for the handler function. Defaults to ().

TYPE: Sequence[Depends] DEFAULT: ()

parser

Optional custom parser for parsing the input. Defaults to None.

TYPE: Optional[CustomParser[IncomingMessage, RabbitMessage]] DEFAULT: None

decoder

Optional custom decoder for decoding the input. Defaults to None.

TYPE: Optional[CustomDecoder[RabbitMessage]] DEFAULT: None

middlewares

Optional sequence of middlewares to be applied. Defaults to None.

TYPE: Optional[Sequence[Callable[[IncomingMessage], BaseMiddleware]]] DEFAULT: None

filter

Optional filter for filtering messages. Defaults to default_filter.

TYPE: Filter[RabbitMessage] DEFAULT: default_filter

no_ack

Whether not to ack/nack/reject messages.

TYPE: bool DEFAULT: False

title

Title for AsyncAPI docs.

TYPE: Optional[str] DEFAULT: None

description

Description for AsyncAPI docs.

TYPE: Optional[str] DEFAULT: None

include_in_schema

Whether to include the handler in AsyncAPI docs.

TYPE: bool DEFAULT: True

**original_kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Callable

A decorator function for defining message subscribers.

TYPE: Callable[[Callable[P_HandlerParams, T_HandlerReturn]], HandlerCallWrapper[IncomingMessage, P_HandlerParams, T_HandlerReturn]]

Source code in faststream/rabbit/broker.py
@override
def subscriber(  # type: ignore[override]
    self,
    queue: Union[str, RabbitQueue],
    exchange: Union[str, RabbitExchange, None] = None,
    *,
    consume_args: Optional[AnyDict] = None,
    reply_config: Optional[ReplyConfig] = None,
    # broker arguments
    dependencies: Sequence[Depends] = (),
    parser: Optional[CustomParser[aio_pika.IncomingMessage, RabbitMessage]] = None,
    decoder: Optional[CustomDecoder[RabbitMessage]] = None,
    middlewares: Optional[
        Sequence[Callable[[aio_pika.IncomingMessage], BaseMiddleware]]
    ] = None,
    filter: Filter[RabbitMessage] = default_filter,
    no_ack: bool = False,
    # AsyncAPI information
    title: Optional[str] = None,
    description: Optional[str] = None,
    include_in_schema: bool = True,
    **original_kwargs: Any,
) -> Callable[
    [Callable[P_HandlerParams, T_HandlerReturn]],
    HandlerCallWrapper[aio_pika.IncomingMessage, P_HandlerParams, T_HandlerReturn],
]:
    """Decorator to define a message subscriber.

    Args:
        queue (Union[str, RabbitQueue]): The name of the RabbitMQ queue.
        exchange (Union[str, RabbitExchange, None], optional): The name of the RabbitMQ exchange. Defaults to None.
        consume_args (Optional[AnyDict], optional): Additional arguments for message consumption.
        reply_config (Optional[ReplyConfig], optional): The reply configuration for the message.
        dependencies (Sequence[Depends], optional): Additional dependencies for the handler function. Defaults to ().
        parser (Optional[CustomParser[aio_pika.IncomingMessage, RabbitMessage]], optional): Optional custom parser for parsing the input. Defaults to None.
        decoder (Optional[CustomDecoder[RabbitMessage]], optional): Optional custom decoder for decoding the input. Defaults to None.
        middlewares (Optional[Sequence[Callable[[aio_pika.IncomingMessage], BaseMiddleware]]], optional): Optional sequence of middlewares to be applied. Defaults to None.
        filter (Filter[RabbitMessage], optional): Optional filter for filtering messages. Defaults to default_filter.
        no_ack (bool): Whether not to ack/nack/reject messages.
        title (Optional[str]): Title for AsyncAPI docs.
        description (Optional[str]): Description for AsyncAPI docs.
        include_in_schema (bool): Whether to include the handler in AsyncAPI docs.
        **original_kwargs (Any): Additional keyword arguments.

    Returns:
        Callable: A decorator function for defining message subscribers.
    """
    super().subscriber()

    r_queue = RabbitQueue.validate(queue)
    r_exchange = RabbitExchange.validate(exchange)

    self._setup_log_context(r_queue, r_exchange)

    key = get_routing_hash(r_queue, r_exchange)
    handler = self.handlers.get(
        key,
        Handler(
            log_context_builder=partial(
                self._get_log_context, queue=r_queue, exchange=r_exchange
            ),
            queue=r_queue,
            exchange=r_exchange,
            consume_args=consume_args,
            description=description,
            title=title,
            virtual_host=self.virtual_host,
            include_in_schema=include_in_schema,
            graceful_timeout=self.graceful_timeout,
        ),
    )

    self.handlers[key] = handler

    def consumer_wrapper(
        func: Callable[P_HandlerParams, T_HandlerReturn],
    ) -> HandlerCallWrapper[
        aio_pika.IncomingMessage, P_HandlerParams, T_HandlerReturn
    ]:
        """Wraps a consumer function with additional functionality.

        Args:
            func: The consumer function to be wrapped.

        Returns:
            The wrapped consumer function.

        Raises:
            NotImplementedError: If silent animals are not supported.
        !!! note

            The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
        """
        handler_call, dependant = self._wrap_handler(
            func,
            extra_dependencies=dependencies,
            no_ack=no_ack,
            _process_kwargs={
                "reply_config": reply_config,
            },
            **original_kwargs,
        )

        handler.add_call(
            handler=handler_call,
            filter=filter,
            middlewares=middlewares,
            parser=parser or self._global_parser,
            decoder=decoder or self._global_decoder,
            dependant=dependant,
        )

        return handler_call

    return consumer_wrapper