Skip to content

KafkaLoggingBroker

faststream.kafka.broker.logging.KafkaLoggingBroker #

KafkaLoggingBroker(*args, logger=EMPTY, log_level=logging.INFO, log_fmt=None, **kwargs)

Bases: BrokerUsecase[Union['aiokafka.ConsumerRecord', Tuple['aiokafka.ConsumerRecord', ...]], Callable[..., 'aiokafka.AIOKafkaConsumer']]

A class that extends the LoggingMixin class and adds additional functionality for logging Kafka related information.

Initialize the class.

Source code in faststream/kafka/broker/logging.py
def __init__(
    self,
    *args: Any,
    logger: Optional["LoggerProto"] = EMPTY,
    log_level: int = logging.INFO,
    log_fmt: Optional[str] = None,
    **kwargs: Any,
) -> None:
    """Initialize the class."""
    super().__init__(
        *args,
        logger=logger,
        # TODO: generate unique logger names to not share between brokers
        default_logger=get_broker_logger(
            name="kafka",
            default_context={
                "topic": "",
                "group_id": "",
            },
            message_id_ln=self.__max_msg_id_ln,
        ),
        log_level=log_level,
        log_fmt=log_fmt,
        **kwargs,
    )
    self._max_topic_len = 4
    self._max_group_len = 0

prefix instance-attribute #

prefix = prefix

include_in_schema instance-attribute #

include_in_schema = include_in_schema

logger instance-attribute #

logger

use_custom instance-attribute #

use_custom = True

url instance-attribute #

url = asyncapi_url

running instance-attribute #

running = False

graceful_timeout instance-attribute #

graceful_timeout = graceful_timeout

protocol instance-attribute #

protocol = protocol

protocol_version instance-attribute #

protocol_version = protocol_version

description instance-attribute #

description = description

tags instance-attribute #

tags = tags

security instance-attribute #

security = security

setup #

setup()

Prepare all Broker entities to startup.

Source code in faststream/broker/core/usecase.py
def setup(self) -> None:
    """Prepare all Broker entities to startup."""
    for h in self._subscribers.values():
        self.setup_subscriber(h)

    for p in self._publishers.values():
        self.setup_publisher(p)

add_middleware #

add_middleware(middleware)

Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of already existed ones.

Source code in faststream/broker/core/abc.py
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.

    Current middleware will be used as a most inner of already existed ones.
    """
    self._middlewares = (*self._middlewares, middleware)

    for sub in self._subscribers.values():
        sub.add_middleware(middleware)

    for pub in self._publishers.values():
        pub.add_middleware(middleware)

subscriber abstractmethod #

subscriber(subscriber)
Source code in faststream/broker/core/abc.py
@abstractmethod
def subscriber(
    self,
    subscriber: "SubscriberProto[MsgType]",
) -> "SubscriberProto[MsgType]":
    subscriber.add_prefix(self.prefix)
    key = hash(subscriber)
    subscriber = self._subscribers.get(key, subscriber)
    self._subscribers = {**self._subscribers, key: subscriber}
    return subscriber

publisher #

publisher(*args, **kwargs)
Source code in faststream/broker/core/usecase.py
def publisher(self, *args: Any, **kwargs: Any) -> "PublisherProto[MsgType]":
    pub = super().publisher(*args, **kwargs)
    if self.running:
        self.setup_publisher(pub)
    return pub

include_router #

include_router(router, *, prefix='', dependencies=(), middlewares=(), include_in_schema=None)

Includes a router in the current object.

Source code in faststream/broker/core/abc.py
def include_router(
    self,
    router: "ABCBroker[Any]",
    *,
    prefix: str = "",
    dependencies: Iterable["Depends"] = (),
    middlewares: Iterable["BrokerMiddleware[MsgType]"] = (),
    include_in_schema: Optional[bool] = None,
) -> None:
    """Includes a router in the current object."""
    for h in router._subscribers.values():
        h.add_prefix("".join((self.prefix, prefix)))

        if (key := hash(h)) not in self._subscribers:
            if include_in_schema is None:
                h.include_in_schema = self._solve_include_in_schema(
                    h.include_in_schema
                )
            else:
                h.include_in_schema = include_in_schema

            h._broker_middlewares = (
                *self._middlewares,
                *middlewares,
                *h._broker_middlewares,
            )
            h._broker_dependencies = (
                *self._dependencies,
                *dependencies,
                *h._broker_dependencies,
            )
            self._subscribers = {**self._subscribers, key: h}

    for p in router._publishers.values():
        p.add_prefix(self.prefix)

        if (key := hash(p)) not in self._publishers:
            if include_in_schema is None:
                p.include_in_schema = self._solve_include_in_schema(
                    p.include_in_schema
                )
            else:
                p.include_in_schema = include_in_schema

            p._broker_middlewares = (
                *self._middlewares,
                *middlewares,
                *p._broker_middlewares,
            )
            self._publishers = {**self._publishers, key: p}

include_routers #

include_routers(*routers)

Includes routers in the object.

Source code in faststream/broker/core/abc.py
def include_routers(
    self,
    *routers: "ABCBroker[MsgType]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)

start abstractmethod async #

start()

Start the broker async use case.

Source code in faststream/broker/core/usecase.py
@abstractmethod
async def start(self) -> None:
    """Start the broker async use case."""
    self._abc_start()
    await self.connect()

connect async #

connect(**kwargs)

Connect to a remote server.

Source code in faststream/broker/core/usecase.py
async def connect(self, **kwargs: Any) -> ConnectionType:
    """Connect to a remote server."""
    if self._connection is None:
        connection_kwargs = self._connection_kwargs.copy()
        connection_kwargs.update(kwargs)
        self._connection = await self._connect(**connection_kwargs)
    self.setup()
    return self._connection

setup_subscriber #

setup_subscriber(subscriber, **kwargs)

Setup the Subscriber to prepare it to starting.

Source code in faststream/broker/core/usecase.py
def setup_subscriber(
    self,
    subscriber: SubscriberProto[MsgType],
    **kwargs: Any,
) -> None:
    """Setup the Subscriber to prepare it to starting."""
    data = self._subscriber_setup_extra.copy()
    data.update(kwargs)
    subscriber.setup(**data)

setup_publisher #

setup_publisher(publisher, **kwargs)

Setup the Publisher to prepare it to starting.

Source code in faststream/broker/core/usecase.py
def setup_publisher(
    self,
    publisher: "PublisherProto[MsgType]",
    **kwargs: Any,
) -> None:
    """Setup the Publisher to prepare it to starting."""
    data = self._publisher_setup_extra.copy()
    data.update(kwargs)
    publisher.setup(**data)

close async #

close(exc_type=None, exc_val=None, exc_tb=None)

Closes the object.

Source code in faststream/broker/core/usecase.py
async def close(
    self,
    exc_type: Optional[Type[BaseException]] = None,
    exc_val: Optional[BaseException] = None,
    exc_tb: Optional["TracebackType"] = None,
) -> None:
    """Closes the object."""
    self.running = False

    for h in self._subscribers.values():
        await h.close()

    if self._connection is not None:
        await self._close(exc_type, exc_val, exc_tb)

publish async #

publish(msg, *, producer, correlation_id=None, **kwargs)

Publish message directly.

Source code in faststream/broker/core/usecase.py
async def publish(
    self,
    msg: Any,
    *,
    producer: Optional["ProducerProto"],
    correlation_id: Optional[str] = None,
    **kwargs: Any,
) -> Optional[Any]:
    """Publish message directly."""
    assert producer, NOT_CONNECTED_YET  # nosec B101

    publish = producer.publish

    for m in self._middlewares:
        publish = partial(m(None).publish_scope, publish)

    return await publish(msg, correlation_id=correlation_id, **kwargs)

request async #

request(msg, *, producer, correlation_id=None, **kwargs)

Publish message directly.

Source code in faststream/broker/core/usecase.py
async def request(
    self,
    msg: Any,
    *,
    producer: Optional["ProducerProto"],
    correlation_id: Optional[str] = None,
    **kwargs: Any,
) -> Any:
    """Publish message directly."""
    assert producer, NOT_CONNECTED_YET  # nosec B101

    request = producer.request
    for m in self._middlewares:
        request = partial(m(None).publish_scope, request)

    published_msg = await request(
        msg,
        correlation_id=correlation_id,
        **kwargs,
    )

    async with AsyncExitStack() as stack:
        return_msg = return_input
        for m in self._middlewares:
            mid = m(published_msg)
            await stack.enter_async_context(mid)
            return_msg = partial(mid.consume_scope, return_msg)

        parsed_msg: StreamMessage[Any] = await producer._parser(published_msg)
        parsed_msg._decoded_body = await producer._decoder(parsed_msg)
        return await return_msg(parsed_msg)

ping abstractmethod async #

ping(timeout)

Check connection alive.

Source code in faststream/broker/core/usecase.py
@abstractmethod
async def ping(self, timeout: Optional[float]) -> bool:
    """Check connection alive."""
    raise NotImplementedError()

get_fmt #

get_fmt()
Source code in faststream/kafka/broker/logging.py
def get_fmt(self) -> str:
    return (
        "%(asctime)s %(levelname)-8s - "
        + f"%(topic)-{self._max_topic_len}s | "
        + (f"%(group_id)-{self._max_group_len}s | " if self._max_group_len else "")
        + f"%(message_id)-{self.__max_msg_id_ln}s "
        + "- %(message)s"
    )