Skip to content

LoggingListenerProxy

faststream.kafka.listener.LoggingListenerProxy #

LoggingListenerProxy(consumer, logger, listener)

Bases: ConsumerRebalanceListener

Logs partition assignments and passes calls to user-supplied listener.

Source code in faststream/kafka/listener.py
def __init__(
    self,
    consumer: "AIOKafkaConsumer",
    logger: Optional["LoggerProto"],
    listener: Optional[ConsumerRebalanceListener],
):
    self.consumer = consumer
    self.logger = logger
    self.listener = listener

consumer instance-attribute #

consumer = consumer

logger instance-attribute #

logger = logger

listener instance-attribute #

listener = listener

on_partitions_revoked async #

on_partitions_revoked(revoked)
Source code in faststream/kafka/listener.py
async def on_partitions_revoked(self, revoked: Set["TopicPartition"]) -> None:
    if self.listener:
        call_result = self.listener.on_partitions_revoked(revoked)
        if asyncio.iscoroutine(call_result):
            await call_result

on_partitions_assigned async #

on_partitions_assigned(assigned)
Source code in faststream/kafka/listener.py
async def on_partitions_assigned(self, assigned: Set["TopicPartition"]) -> None:
    self._log(
        logging.INFO,
        f"Consumer {self.consumer._coordinator.member_id} assigned to partitions: "
        f"{assigned}",
    )
    if not assigned:
        self._log(
            logging.WARNING,
            f"Consumer in group {self.consumer._group_id} has no partition assignments - topics "
            f"{self.consumer._subscription.topics} may have fewer partitions than consumers",
        )

    if self.listener:
        call_result = self.listener.on_partitions_assigned(assigned)
        if asyncio.iscoroutine(call_result):
            await call_result