Skip to content

LoggingBroker

faststream.broker.core.logging.LoggingBroker #

LoggingBroker(*args, default_logger, logger, log_level, log_fmt, **kwargs)

Bases: ABCBroker[MsgType]

A mixin class for logging.

Source code in faststream/broker/core/logging.py
def __init__(
    self,
    *args: Any,
    default_logger: Annotated[
        logging.Logger,
        Doc("Logger object to use if `logger` is not set."),
    ],
    logger: Annotated[
        Optional["LoggerProto"],
        Doc("User specified logger to pass into Context and log service messages."),
    ],
    log_level: Annotated[
        int,
        Doc("Service messages log level."),
    ],
    log_fmt: Annotated[
        Optional[str],
        Doc("Default logger log format."),
    ],
    **kwargs: Any,
) -> None:
    if logger is not EMPTY:
        self.logger = logger
        self.use_custom = True
    else:
        self.logger = default_logger
        self.use_custom = False

    self._msg_log_level = log_level
    self._fmt = log_fmt

    super().__init__(*args, **kwargs)

prefix instance-attribute #

prefix = prefix

include_in_schema instance-attribute #

include_in_schema = include_in_schema

logger instance-attribute #

logger

use_custom instance-attribute #

use_custom = True

add_middleware #

add_middleware(middleware)

Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of already existed ones.

Source code in faststream/broker/core/abc.py
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.

    Current middleware will be used as a most inner of already existed ones.
    """
    self._middlewares = (*self._middlewares, middleware)

    for sub in self._subscribers.values():
        sub.add_middleware(middleware)

    for pub in self._publishers.values():
        pub.add_middleware(middleware)

subscriber abstractmethod #

subscriber(subscriber)
Source code in faststream/broker/core/abc.py
@abstractmethod
def subscriber(
    self,
    subscriber: "SubscriberProto[MsgType]",
) -> "SubscriberProto[MsgType]":
    subscriber.add_prefix(self.prefix)
    key = hash(subscriber)
    subscriber = self._subscribers.get(key, subscriber)
    self._subscribers = {**self._subscribers, key: subscriber}
    return subscriber

publisher abstractmethod #

publisher(publisher)
Source code in faststream/broker/core/abc.py
@abstractmethod
def publisher(
    self,
    publisher: "PublisherProto[MsgType]",
) -> "PublisherProto[MsgType]":
    publisher.add_prefix(self.prefix)
    key = hash(publisher)
    publisher = self._publishers.get(key, publisher)
    self._publishers = {**self._publishers, key: publisher}
    return publisher

include_router #

include_router(router, *, prefix='', dependencies=(), middlewares=(), include_in_schema=None)

Includes a router in the current object.

Source code in faststream/broker/core/abc.py
def include_router(
    self,
    router: "ABCBroker[Any]",
    *,
    prefix: str = "",
    dependencies: Iterable["Depends"] = (),
    middlewares: Iterable["BrokerMiddleware[MsgType]"] = (),
    include_in_schema: Optional[bool] = None,
) -> None:
    """Includes a router in the current object."""
    for h in router._subscribers.values():
        h.add_prefix("".join((self.prefix, prefix)))

        if (key := hash(h)) not in self._subscribers:
            if include_in_schema is None:
                h.include_in_schema = self._solve_include_in_schema(
                    h.include_in_schema
                )
            else:
                h.include_in_schema = include_in_schema

            h._broker_middlewares = (
                *self._middlewares,
                *middlewares,
                *h._broker_middlewares,
            )
            h._broker_dependencies = (
                *self._dependencies,
                *dependencies,
                *h._broker_dependencies,
            )
            self._subscribers = {**self._subscribers, key: h}

    for p in router._publishers.values():
        p.add_prefix(self.prefix)

        if (key := hash(p)) not in self._publishers:
            if include_in_schema is None:
                p.include_in_schema = self._solve_include_in_schema(
                    p.include_in_schema
                )
            else:
                p.include_in_schema = include_in_schema

            p._broker_middlewares = (
                *self._middlewares,
                *middlewares,
                *p._broker_middlewares,
            )
            self._publishers = {**self._publishers, key: p}

include_routers #

include_routers(*routers)

Includes routers in the object.

Source code in faststream/broker/core/abc.py
def include_routers(
    self,
    *routers: "ABCBroker[MsgType]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)

get_fmt abstractmethod #

get_fmt()

Fallback method to get log format if log_fmt if not specified.

Source code in faststream/broker/core/logging.py
@abstractmethod
def get_fmt(self) -> str:
    """Fallback method to get log format if `log_fmt` if not specified."""
    raise NotImplementedError()