Skip to content

RabbitRouter

faststream.rabbit.shared.router.RabbitRouter #

RabbitRouter(
    prefix: str = "",
    handlers: Sequence[
        BrokerRoute[IncomingMessage, SendableMessage]
    ] = (),
    **kwargs: Any
)

Bases: BrokerRouter[int, IncomingMessage]

A class representing a RabbitMQ router for handling incoming messages.

METHOD DESCRIPTION
__init__

initializes the RabbitRouter object

subscriber

decorator for subscribing to a queue and registering a handler function

Override the __init__ method of the parent class.

PARAMETER DESCRIPTION
prefix

A prefix string

TYPE: str DEFAULT: ''

handlers

A sequence of RabbitRoute objects

TYPE: Sequence[BrokerRoute[IncomingMessage, SendableMessage]] DEFAULT: ()

**kwargs

Additional keyword arguments

TYPE: Any DEFAULT: {}

RAISES DESCRIPTION
NotImplementedError

If silent animals are not supported

Source code in faststream/rabbit/shared/router.py
def __init__(
    self,
    prefix: str = "",
    handlers: Sequence[RabbitRoute[IncomingMessage, SendableMessage]] = (),
    **kwargs: Any,
) -> None:
    """Override the `__init__` method of the parent class.

    Args:
        prefix: A prefix string
        handlers: A sequence of RabbitRoute objects
        **kwargs: Additional keyword arguments

    Raises:
        NotImplementedError: If silent animals are not supported

    """
    for h in handlers:
        if (q := h.kwargs.pop("queue", None)) is None:
            q, h.args = h.args[0], h.args[1:]
        queue = RabbitQueue.validate(q)
        new_q = model_copy(queue, update={"name": prefix + queue.name})
        h.args = (new_q, *h.args)

    super().__init__(prefix, handlers, **kwargs)

include_in_schema instance-attribute #

include_in_schema = include_in_schema

prefix instance-attribute #

prefix: str = prefix

include_router #

include_router(
    router: BrokerRouter[PublisherKeyType, MsgType]
) -> None

Includes a router in the current object.

PARAMETER DESCRIPTION
router

The router to be included.

TYPE: BrokerRouter[PublisherKeyType, MsgType]

RETURNS DESCRIPTION
None

None

Source code in faststream/broker/router.py
def include_router(self, router: "BrokerRouter[PublisherKeyType, MsgType]") -> None:
    """Includes a router in the current object.

    Args:
        router: The router to be included.

    Returns:
        None

    """
    for h in router._handlers:
        self.subscriber(*h.args, **h.kwargs)(h.call)

    for p in router._publishers.values():
        p = self._update_publisher_prefix(self.prefix, p)
        key = self._get_publisher_key(p)
        self._publishers[key] = self._publishers.get(key, p)

include_routers #

include_routers(
    *routers: BrokerRouter[PublisherKeyType, MsgType]
) -> None

Includes routers in the object.

PARAMETER DESCRIPTION
*routers

Variable length argument list of routers to include.

TYPE: BrokerRouter[PublisherKeyType, MsgType] DEFAULT: ()

RETURNS DESCRIPTION
None

None

Source code in faststream/broker/router.py
def include_routers(
    self, *routers: "BrokerRouter[PublisherKeyType, MsgType]"
) -> None:
    """Includes routers in the object.

    Args:
        *routers: Variable length argument list of routers to include.

    Returns:
        None

    """
    for r in routers:
        self.include_router(r)

publisher abstractmethod #

publisher(
    subj: str, *args: Any, **kwargs: Any
) -> BasePublisher[MsgType]

Publishes a message.

PARAMETER DESCRIPTION
subj

Subject of the message

TYPE: str

*args

Additional arguments

TYPE: Any DEFAULT: ()

**kwargs

Additional keyword arguments

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
BasePublisher[MsgType]

The published message

RAISES DESCRIPTION
NotImplementedError

If the method is not implemented

Source code in faststream/broker/router.py
@abstractmethod
def publisher(
    self,
    subj: str,
    *args: Any,
    **kwargs: Any,
) -> BasePublisher[MsgType]:
    """Publishes a message.

    Args:
        subj: Subject of the message
        *args: Additional arguments
        **kwargs: Additional keyword arguments

    Returns:
        The published message

    Raises:
        NotImplementedError: If the method is not implemented

    """
    raise NotImplementedError()

subscriber #

subscriber(
    queue: Union[str, RabbitQueue],
    *broker_args: Any,
    **broker_kwargs: Any
) -> Callable[
    [Callable[P_HandlerParams, T_HandlerReturn]],
    HandlerCallWrapper[
        IncomingMessage, P_HandlerParams, T_HandlerReturn
    ],
]

A function to subscribe to a RabbitMQ queue.

PARAMETER DESCRIPTION
self

the instance of the class

queue

the queue to subscribe to, can be a string or a RabbitQueue object

*broker_args

additional arguments for the broker

DEFAULT: ()

**broker_kwargs

additional keyword arguments for the broker

DEFAULT: {}

RETURNS DESCRIPTION
Callable[[Callable[P_HandlerParams, T_HandlerReturn]], HandlerCallWrapper[IncomingMessage, P_HandlerParams, T_HandlerReturn]]

A callable object that wraps the handler function for the incoming messages from the queue.

RAISES DESCRIPTION
TypeError

If the queue is not a string or a RabbitQueue object

Source code in faststream/rabbit/shared/router.py
@override
def subscriber(  # type: ignore[override]
    self,
    queue: Union[str, RabbitQueue],
    *broker_args: Any,
    **broker_kwargs: Any,
) -> Callable[
    [Callable[P_HandlerParams, T_HandlerReturn]],
    HandlerCallWrapper[IncomingMessage, P_HandlerParams, T_HandlerReturn],
]:
    """A function to subscribe to a RabbitMQ queue.

    Args:
        self : the instance of the class
        queue : the queue to subscribe to, can be a string or a RabbitQueue object
        *broker_args : additional arguments for the broker
        **broker_kwargs : additional keyword arguments for the broker

    Returns:
        A callable object that wraps the handler function for the incoming messages from the queue.

    Raises:
        TypeError: If the queue is not a string or a RabbitQueue object

    """
    q = RabbitQueue.validate(queue)
    new_q = model_copy(q, update={"name": self.prefix + q.name})
    return self._wrap_subscriber(new_q, *broker_args, **broker_kwargs)