Skip to content

BrokerRouter

faststream.broker.router.BrokerRouter #

BrokerRouter(*, handlers, prefix, dependencies, middlewares, parser, decoder, include_in_schema)

Bases: ABCBroker[MsgType]

A generic class representing a broker router.

Source code in faststream/broker/router.py
def __init__(
    self,
    *,
    handlers: Iterable[SubscriberRoute],
    # base options
    prefix: str,
    dependencies: Iterable["Depends"],
    middlewares: Sequence["BrokerMiddleware[MsgType]"],
    parser: Optional["CustomCallable"],
    decoder: Optional["CustomCallable"],
    include_in_schema: Optional[bool],
) -> None:
    super().__init__(
        prefix=prefix,
        dependencies=dependencies,
        middlewares=middlewares,
        parser=parser,
        decoder=decoder,
        include_in_schema=include_in_schema,
    )

    for h in handlers:
        call = h.call

        for p in h.publishers:
            call = self.publisher(*p.args, **p.kwargs)(call)

        self.subscriber(*h.args, **h.kwargs)(call)

prefix instance-attribute #

prefix = prefix

include_in_schema instance-attribute #

include_in_schema = include_in_schema

add_middleware #

add_middleware(middleware)

Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of already existed ones.

Source code in faststream/broker/core/abc.py
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.

    Current middleware will be used as a most inner of already existed ones.
    """
    self._middlewares = (*self._middlewares, middleware)

    for sub in self._subscribers.values():
        sub.add_middleware(middleware)

    for pub in self._publishers.values():
        pub.add_middleware(middleware)

subscriber abstractmethod #

subscriber(subscriber)
Source code in faststream/broker/core/abc.py
@abstractmethod
def subscriber(
    self,
    subscriber: "SubscriberProto[MsgType]",
) -> "SubscriberProto[MsgType]":
    subscriber.add_prefix(self.prefix)
    key = hash(subscriber)
    subscriber = self._subscribers.get(key, subscriber)
    self._subscribers = {**self._subscribers, key: subscriber}
    return subscriber

publisher abstractmethod #

publisher(publisher)
Source code in faststream/broker/core/abc.py
@abstractmethod
def publisher(
    self,
    publisher: "PublisherProto[MsgType]",
) -> "PublisherProto[MsgType]":
    publisher.add_prefix(self.prefix)
    key = hash(publisher)
    publisher = self._publishers.get(key, publisher)
    self._publishers = {**self._publishers, key: publisher}
    return publisher

include_router #

include_router(router, *, prefix='', dependencies=(), middlewares=(), include_in_schema=None)

Includes a router in the current object.

Source code in faststream/broker/core/abc.py
def include_router(
    self,
    router: "ABCBroker[Any]",
    *,
    prefix: str = "",
    dependencies: Iterable["Depends"] = (),
    middlewares: Iterable["BrokerMiddleware[MsgType]"] = (),
    include_in_schema: Optional[bool] = None,
) -> None:
    """Includes a router in the current object."""
    for h in router._subscribers.values():
        h.add_prefix("".join((self.prefix, prefix)))

        if (key := hash(h)) not in self._subscribers:
            if include_in_schema is None:
                h.include_in_schema = self._solve_include_in_schema(
                    h.include_in_schema
                )
            else:
                h.include_in_schema = include_in_schema

            h._broker_middlewares = (
                *self._middlewares,
                *middlewares,
                *h._broker_middlewares,
            )
            h._broker_dependencies = (
                *self._dependencies,
                *dependencies,
                *h._broker_dependencies,
            )
            self._subscribers = {**self._subscribers, key: h}

    for p in router._publishers.values():
        p.add_prefix(self.prefix)

        if (key := hash(p)) not in self._publishers:
            if include_in_schema is None:
                p.include_in_schema = self._solve_include_in_schema(
                    p.include_in_schema
                )
            else:
                p.include_in_schema = include_in_schema

            p._broker_middlewares = (
                *self._middlewares,
                *middlewares,
                *p._broker_middlewares,
            )
            self._publishers = {**self._publishers, key: p}

include_routers #

include_routers(*routers)

Includes routers in the object.

Source code in faststream/broker/core/abc.py
def include_routers(
    self,
    *routers: "ABCBroker[MsgType]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)