Bases: ConsumerRebalanceListener
Logs partition assignments and passes calls to user-supplied listener.
Source code in faststream/kafka/listener.py
| def __init__(
self,
consumer: "AIOKafkaConsumer",
logger: Optional["LoggerProto"],
listener: Optional[ConsumerRebalanceListener],
):
self.consumer = consumer
self.logger = logger
self.listener = listener
|
consumer instance-attribute
logger instance-attribute
listener instance-attribute
on_partitions_revoked async
on_partitions_revoked(revoked)
Source code in faststream/kafka/listener.py
| async def on_partitions_revoked(self, revoked: Set["TopicPartition"]) -> None:
if self.listener:
call_result = self.listener.on_partitions_revoked(revoked)
if asyncio.iscoroutine(call_result):
await call_result
|
on_partitions_assigned async
on_partitions_assigned(assigned)
Source code in faststream/kafka/listener.py
| async def on_partitions_assigned(self, assigned: Set["TopicPartition"]) -> None:
self._log(
logging.INFO,
f"Consumer {self.consumer._coordinator.member_id} assigned to partitions: "
f"{assigned}",
)
if not assigned:
self._log(
logging.WARNING,
f"Consumer in group {self.consumer._group_id} has no partition assignments - topics "
f"{self.consumer._subscription.topics} may have fewer partitions than consumers",
)
if self.listener:
call_result = self.listener.on_partitions_assigned(assigned)
if asyncio.iscoroutine(call_result):
await call_result
|