Skip to content

LoggingListenerProxy

faststream.kafka.listener.LoggingListenerProxy #

LoggingListenerProxy(consumer, logger, listener)

Bases: ConsumerRebalanceListener

Logs partition assignments and passes calls to user-supplied listener.

Source code in faststream/kafka/listener.py
def __init__(
    self,
    consumer: "AIOKafkaConsumer",
    logger: Optional["LoggerProto"],
    listener: Optional[ConsumerRebalanceListener],
):
    self.consumer = consumer
    self.logger = logger
    self.listener = listener
    self._log_unassigned_consumer_task: Optional[asyncio.Task[None]] = None

consumer instance-attribute #

consumer = consumer

logger instance-attribute #

logger = logger

listener instance-attribute #

listener = listener

log_unassigned_consumer async #

log_unassigned_consumer()
Source code in faststream/kafka/listener.py
async def log_unassigned_consumer(self) -> None:
    await asyncio.sleep(self._log_unassigned_consumer_delay_seconds)
    self._log(
        logging.WARNING,
        f"Consumer in group {self.consumer._group_id} has had no partition "
        f"assignments for {self._log_unassigned_consumer_delay_seconds} seconds: "
        f"topics {self.consumer._subscription.topics} may have fewer partitions "
        f"than consumers.",
    )

on_partitions_revoked async #

on_partitions_revoked(revoked)
Source code in faststream/kafka/listener.py
async def on_partitions_revoked(self, revoked: Set["TopicPartition"]) -> None:
    if self.listener:
        call_result = self.listener.on_partitions_revoked(revoked)
        if asyncio.iscoroutine(call_result):
            await call_result

on_partitions_assigned async #

on_partitions_assigned(assigned)
Source code in faststream/kafka/listener.py
async def on_partitions_assigned(self, assigned: Set["TopicPartition"]) -> None:
    self._log(
        logging.INFO,
        f"Consumer {self.consumer._coordinator.member_id} assigned to partitions: "
        f"{assigned}",
    )
    if not assigned:
        self._log(
            logging.WARNING,
            f"Consumer in group {self.consumer._group_id} has no partition assignments - this "
            f"could be temporary, e.g. during a rolling update. A separate warning will be logged if "
            f"this condition persists for {self._log_unassigned_consumer_delay_seconds} seconds.",
        )
        self._log_unassigned_consumer_task = asyncio.create_task(
            self.log_unassigned_consumer()
        )
    elif self._log_unassigned_consumer_task:
        self._log_unassigned_consumer_task.cancel()
        self._log_unassigned_consumer_task = None

    if self.listener:
        call_result = self.listener.on_partitions_assigned(assigned)
        if asyncio.iscoroutine(call_result):
            await call_result