KafkaLoggingBroker(*args, logger=EMPTY, log_level=INFO, log_fmt=None, **kwargs)
Bases: BrokerUsecase[Union['confluent_kafka.Message', Tuple['confluent_kafka.Message', ...]], Callable[..., AsyncConfluentConsumer]]
A class that extends the LoggingMixin class and adds additional functionality for logging Kafka related information.
Initialize the class.
Source code in faststream/confluent/broker/logging.py
| def __init__(
self,
*args: Any,
logger: Optional["LoggerProto"] = EMPTY,
log_level: int = logging.INFO,
log_fmt: Optional[str] = None,
**kwargs: Any,
) -> None:
"""Initialize the class."""
super().__init__(
*args,
logger=logger,
# TODO: generate unique logger names to not share between brokers
default_logger=get_broker_logger(
name="confluent",
default_context={
"topic": "",
"group_id": "",
},
message_id_ln=self.__max_msg_id_ln,
),
log_level=log_level,
log_fmt=log_fmt,
**kwargs,
)
self._max_topic_len = 4
self._max_group_len = 0
|
prefix instance-attribute
include_in_schema instance-attribute
logger instance-attribute
use_custom instance-attribute
running instance-attribute
graceful_timeout instance-attribute
protocol instance-attribute
protocol_version instance-attribute
description instance-attribute
security instance-attribute
setup
Prepare all Broker entities to startup.
Source code in faststream/broker/core/usecase.py
| def setup(self) -> None:
"""Prepare all Broker entities to startup."""
for h in self._subscribers.values():
self.setup_subscriber(h)
for p in self._publishers.values():
self.setup_publisher(p)
|
add_middleware
add_middleware(middleware)
Append BrokerMiddleware to the end of middlewares list.
Current middleware will be used as a most inner of already existed ones.
Source code in faststream/broker/core/abc.py
| def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
"""Append BrokerMiddleware to the end of middlewares list.
Current middleware will be used as a most inner of already existed ones.
"""
self._middlewares = (*self._middlewares, middleware)
for sub in self._subscribers.values():
sub.add_middleware(middleware)
for pub in self._publishers.values():
pub.add_middleware(middleware)
|
subscriber abstractmethod
Source code in faststream/broker/core/abc.py
| @abstractmethod
def subscriber(
self,
subscriber: "SubscriberProto[MsgType]",
) -> "SubscriberProto[MsgType]":
subscriber.add_prefix(self.prefix)
key = hash(subscriber)
subscriber = self._subscribers.get(key, subscriber)
self._subscribers = {**self._subscribers, key: subscriber}
return subscriber
|
publisher
publisher(*args, **kwargs)
Source code in faststream/broker/core/usecase.py
| def publisher(self, *args: Any, **kwargs: Any) -> "PublisherProto[MsgType]":
pub = super().publisher(*args, **kwargs)
if self.running:
self.setup_publisher(pub)
return pub
|
include_router
include_router(router, *, prefix='', dependencies=(), middlewares=(), include_in_schema=None)
Includes a router in the current object.
Source code in faststream/broker/core/abc.py
| def include_router(
self,
router: "ABCBroker[Any]",
*,
prefix: str = "",
dependencies: Iterable["Depends"] = (),
middlewares: Iterable["BrokerMiddleware[MsgType]"] = (),
include_in_schema: Optional[bool] = None,
) -> None:
"""Includes a router in the current object."""
for h in router._subscribers.values():
h.add_prefix("".join((self.prefix, prefix)))
if (key := hash(h)) not in self._subscribers:
if include_in_schema is None:
h.include_in_schema = self._solve_include_in_schema(
h.include_in_schema
)
else:
h.include_in_schema = include_in_schema
h._broker_middlewares = (
*self._middlewares,
*middlewares,
*h._broker_middlewares,
)
h._broker_dependencies = (
*self._dependencies,
*dependencies,
*h._broker_dependencies,
)
self._subscribers = {**self._subscribers, key: h}
for p in router._publishers.values():
p.add_prefix(self.prefix)
if (key := hash(p)) not in self._publishers:
if include_in_schema is None:
p.include_in_schema = self._solve_include_in_schema(
p.include_in_schema
)
else:
p.include_in_schema = include_in_schema
p._broker_middlewares = (
*self._middlewares,
*middlewares,
*p._broker_middlewares,
)
self._publishers = {**self._publishers, key: p}
|
include_routers
include_routers(*routers)
Includes routers in the object.
Source code in faststream/broker/core/abc.py
| def include_routers(
self,
*routers: "ABCBroker[MsgType]",
) -> None:
"""Includes routers in the object."""
for r in routers:
self.include_router(r)
|
start abstractmethod
async
Start the broker async use case.
Source code in faststream/broker/core/usecase.py
| @abstractmethod
async def start(self) -> None:
"""Start the broker async use case."""
self._abc_start()
await self.connect()
|
connect async
Connect to a remote server.
Source code in faststream/broker/core/usecase.py
| async def connect(self, **kwargs: Any) -> ConnectionType:
"""Connect to a remote server."""
if self._connection is None:
connection_kwargs = self._connection_kwargs.copy()
connection_kwargs.update(kwargs)
self._connection = await self._connect(**connection_kwargs)
self.setup()
return self._connection
|
setup_subscriber
setup_subscriber(subscriber, **kwargs)
Setup the Subscriber to prepare it to starting.
Source code in faststream/broker/core/usecase.py
| def setup_subscriber(
self,
subscriber: SubscriberProto[MsgType],
**kwargs: Any,
) -> None:
"""Setup the Subscriber to prepare it to starting."""
data = self._subscriber_setup_extra.copy()
data.update(kwargs)
subscriber.setup(**data)
|
setup_publisher
setup_publisher(publisher, **kwargs)
Setup the Publisher to prepare it to starting.
Source code in faststream/broker/core/usecase.py
| def setup_publisher(
self,
publisher: "PublisherProto[MsgType]",
**kwargs: Any,
) -> None:
"""Setup the Publisher to prepare it to starting."""
data = self._publisher_setup_extra.copy()
data.update(kwargs)
publisher.setup(**data)
|
close async
close(exc_type=None, exc_val=None, exc_tb=None)
Closes the object.
Source code in faststream/broker/core/usecase.py
| async def close(
self,
exc_type: Optional[Type[BaseException]] = None,
exc_val: Optional[BaseException] = None,
exc_tb: Optional["TracebackType"] = None,
) -> None:
"""Closes the object."""
self.running = False
for h in self._subscribers.values():
await h.close()
if self._connection is not None:
await self._close(exc_type, exc_val, exc_tb)
|
publish async
publish(msg, *, producer, correlation_id=None, **kwargs)
Publish message directly.
Source code in faststream/broker/core/usecase.py
| async def publish(
self,
msg: Any,
*,
producer: Optional["ProducerProto"],
correlation_id: Optional[str] = None,
**kwargs: Any,
) -> Optional[Any]:
"""Publish message directly."""
assert producer, NOT_CONNECTED_YET # nosec B101
publish = producer.publish
for m in self._middlewares:
publish = partial(m(None).publish_scope, publish)
return await publish(msg, correlation_id=correlation_id, **kwargs)
|
request async
request(msg, *, producer, correlation_id=None, **kwargs)
Publish message directly.
Source code in faststream/broker/core/usecase.py
| async def request(
self,
msg: Any,
*,
producer: Optional["ProducerProto"],
correlation_id: Optional[str] = None,
**kwargs: Any,
) -> Any:
"""Publish message directly."""
assert producer, NOT_CONNECTED_YET # nosec B101
request = producer.request
for m in self._middlewares:
request = partial(m(None).publish_scope, request)
published_msg = await request(
msg,
correlation_id=correlation_id,
**kwargs,
)
async with AsyncExitStack() as stack:
return_msg = return_input
for m in self._middlewares:
mid = m(published_msg)
await stack.enter_async_context(mid)
return_msg = partial(mid.consume_scope, return_msg)
parsed_msg: StreamMessage[Any] = await producer._parser(published_msg)
parsed_msg._decoded_body = await producer._decoder(parsed_msg)
parsed_msg._source_type = SourceType.Response
return await return_msg(parsed_msg)
|
ping abstractmethod
async
Check connection alive.
Source code in faststream/broker/core/usecase.py
| @abstractmethod
async def ping(self, timeout: Optional[float]) -> bool:
"""Check connection alive."""
raise NotImplementedError()
|
get_fmt
Source code in faststream/confluent/broker/logging.py
| def get_fmt(self) -> str:
return (
"%(asctime)s %(levelname)-8s - "
+ f"%(topic)-{self._max_topic_len}s | "
+ (f"%(group_id)-{self._max_group_len}s | " if self._max_group_len else "")
+ f"%(message_id)-{self.__max_msg_id_ln}s "
+ "- %(message)s"
)
|