Skip to content

BrokerRouter

faststream.broker.router.BrokerRouter #

BrokerRouter(prefix: str = '', handlers: Sequence[BrokerRoute[MsgType, SendableMessage]] = (), dependencies: Sequence[Depends] = (), middlewares: Optional[Sequence[Callable[[StreamMessage[MsgType]], AsyncContextManager[None]]]] = None, parser: Optional[CustomParser[MsgType, StreamMessage[MsgType]]] = None, decoder: Optional[CustomDecoder[StreamMessage[MsgType]]] = None, include_in_schema: Optional[bool] = None)

Bases: Generic[PublisherKeyType, MsgType]

A generic class representing a broker router.

METHOD DESCRIPTION
_get_publisher_key

abstract method to get the publisher key

_update_publisher_prefix

abstract method to update the publisher prefix

__init__

constructor method

subscriber

abstract method to define a subscriber

_wrap_subscriber

method to wrap a subscriber function

publisher

abstract method to define a publisher

include_router

method to include a router

include_routers

method to include multiple routers

Initialize a class object.

PARAMETER DESCRIPTION
prefix

Prefix for the object.

TYPE: str DEFAULT: ''

handlers

Handlers for the object.

TYPE: Sequence[BrokerRoute[MsgType, SendableMessage]] DEFAULT: ()

dependencies

Dependencies for the object.

TYPE: Sequence[Depends] DEFAULT: ()

middlewares

Middlewares for the object.

TYPE: Optional[Sequence[Callable[[StreamMessage[MsgType]], AsyncContextManager[None]]]] DEFAULT: None

parser

Parser for the object.

TYPE: Optional[CustomParser[MsgType]] DEFAULT: None

decoder

Decoder for the object.

TYPE: Optional[CustomDecoder[StreamMessage[MsgType]]] DEFAULT: None

include_in_schema

Whether to include the object in the schema.

TYPE: Optional[bool] DEFAULT: None

Source code in faststream/broker/router.py
def __init__(
    self,
    prefix: str = "",
    handlers: Sequence[BrokerRoute[MsgType, SendableMessage]] = (),
    dependencies: Sequence[Depends] = (),
    middlewares: Optional[
        Sequence[
            Callable[
                [StreamMessage[MsgType]],
                AsyncContextManager[None],
            ]
        ]
    ] = None,
    parser: Optional[CustomParser[MsgType, StreamMessage[MsgType]]] = None,
    decoder: Optional[CustomDecoder[StreamMessage[MsgType]]] = None,
    include_in_schema: Optional[bool] = None,
) -> None:
    """Initialize a class object.

    Args:
        prefix (str): Prefix for the object.
        handlers (Sequence[BrokerRoute[MsgType, SendableMessage]]): Handlers for the object.
        dependencies (Sequence[Depends]): Dependencies for the object.
        middlewares (Optional[Sequence[Callable[[StreamMessage[MsgType]], AsyncContextManager[None]]]]): Middlewares for the object.
        parser (Optional[CustomParser[MsgType]]): Parser for the object.
        decoder (Optional[CustomDecoder[StreamMessage[MsgType]]]): Decoder for the object.
        include_in_schema (Optional[bool]): Whether to include the object in the schema.

    """
    self.prefix = prefix
    self.include_in_schema = include_in_schema
    self._handlers = list(handlers)
    self._publishers = {}
    self._dependencies = dependencies
    self._middlewares = middlewares
    self._parser = parser
    self._decoder = decoder

include_in_schema instance-attribute #

include_in_schema = include_in_schema

prefix instance-attribute #

prefix: str = prefix

include_router #

include_router(router: BrokerRouter[PublisherKeyType, MsgType]) -> None

Includes a router in the current object.

PARAMETER DESCRIPTION
router

The router to be included.

TYPE: BrokerRouter[PublisherKeyType, MsgType]

RETURNS DESCRIPTION
None

None

Source code in faststream/broker/router.py
def include_router(self, router: "BrokerRouter[PublisherKeyType, MsgType]") -> None:
    """Includes a router in the current object.

    Args:
        router: The router to be included.

    Returns:
        None

    """
    for h in router._handlers:
        self.subscriber(*h.args, **h.kwargs)(h.call)

    for p in router._publishers.values():
        p = self._update_publisher_prefix(self.prefix, p)
        key = self._get_publisher_key(p)
        self._publishers[key] = self._publishers.get(key, p)

include_routers #

include_routers(*routers: BrokerRouter[PublisherKeyType, MsgType]) -> None

Includes routers in the object.

PARAMETER DESCRIPTION
*routers

Variable length argument list of routers to include.

TYPE: BrokerRouter[PublisherKeyType, MsgType] DEFAULT: ()

RETURNS DESCRIPTION
None

None

Source code in faststream/broker/router.py
def include_routers(
    self, *routers: "BrokerRouter[PublisherKeyType, MsgType]"
) -> None:
    """Includes routers in the object.

    Args:
        *routers: Variable length argument list of routers to include.

    Returns:
        None

    """
    for r in routers:
        self.include_router(r)

publisher abstractmethod #

publisher(subj: str, *args: Any, **kwargs: Any) -> BasePublisher[MsgType]

Publishes a message.

PARAMETER DESCRIPTION
subj

Subject of the message

TYPE: str

*args

Additional arguments

TYPE: Any DEFAULT: ()

**kwargs

Additional keyword arguments

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
BasePublisher[MsgType]

The published message

RAISES DESCRIPTION
NotImplementedError

If the method is not implemented

Source code in faststream/broker/router.py
@abstractmethod
def publisher(
    self,
    subj: str,
    *args: Any,
    **kwargs: Any,
) -> BasePublisher[MsgType]:
    """Publishes a message.

    Args:
        subj: Subject of the message
        *args: Additional arguments
        **kwargs: Additional keyword arguments

    Returns:
        The published message

    Raises:
        NotImplementedError: If the method is not implemented

    """
    raise NotImplementedError()

subscriber abstractmethod #

subscriber(subj: str, *args: Any, dependencies: Sequence[Depends] = (), middlewares: Optional[Sequence[Callable[[StreamMessage[MsgType]], AsyncContextManager[None]]]] = None, parser: Optional[CustomParser[MsgType, StreamMessage[MsgType]]] = None, decoder: Optional[CustomDecoder[StreamMessage[MsgType]]] = None, include_in_schema: Optional[bool] = None, **kwargs: Any) -> Callable[[Callable[P_HandlerParams, T_HandlerReturn]], HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]]

A function to subscribe to a subject.

PARAMETER DESCRIPTION
subj

subject to subscribe to

*args

additional arguments

DEFAULT: ()

dependencies

sequence of dependencies

DEFAULT: ()

middlewares

optional sequence of middlewares

DEFAULT: None

parser

optional custom parser

DEFAULT: None

decoder

optional custom decoder

DEFAULT: None

include_in_schema

whether to include the object in the schema

DEFAULT: None

**kwargs

additional keyword arguments

DEFAULT: {}

RETURNS DESCRIPTION
Callable[[Callable[P_HandlerParams, T_HandlerReturn]], HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]]

A callable handler function

RAISES DESCRIPTION
NotImplementedError

If the function is not implemented

Source code in faststream/broker/router.py
@abstractmethod
def subscriber(
    self,
    subj: str,
    *args: Any,
    dependencies: Sequence[Depends] = (),
    middlewares: Optional[
        Sequence[
            Callable[
                [StreamMessage[MsgType]],
                AsyncContextManager[None],
            ]
        ]
    ] = None,
    parser: Optional[CustomParser[MsgType, StreamMessage[MsgType]]] = None,
    decoder: Optional[CustomDecoder[StreamMessage[MsgType]]] = None,
    include_in_schema: Optional[bool] = None,
    **kwargs: Any,
) -> Callable[
    [Callable[P_HandlerParams, T_HandlerReturn]],
    HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn],
]:
    """A function to subscribe to a subject.

    Args:
        subj : subject to subscribe to
        *args : additional arguments
        dependencies : sequence of dependencies
        middlewares : optional sequence of middlewares
        parser : optional custom parser
        decoder : optional custom decoder
        include_in_schema : whether to include the object in the schema
        **kwargs : additional keyword arguments

    Returns:
        A callable handler function

    Raises:
        NotImplementedError: If the function is not implemented

    """
    raise NotImplementedError()