Skip to content

AsyncConfluentConsumer

faststream.confluent.client.AsyncConfluentConsumer #

AsyncConfluentConsumer(*topics, partitions, logger, config, bootstrap_servers='localhost', client_id='confluent-kafka-consumer', group_id=None, group_instance_id=None, fetch_max_wait_ms=500, fetch_max_bytes=52428800, fetch_min_bytes=1, max_partition_fetch_bytes=1 * 1024 * 1024, retry_backoff_ms=100, auto_offset_reset='latest', enable_auto_commit=True, auto_commit_interval_ms=5000, check_crcs=True, metadata_max_age_ms=5 * 60 * 1000, partition_assignment_strategy='roundrobin', max_poll_interval_ms=300000, session_timeout_ms=10000, heartbeat_interval_ms=3000, security_protocol='PLAINTEXT', connections_max_idle_ms=540000, isolation_level='read_uncommitted', allow_auto_create_topics=True, sasl_mechanism=None, sasl_plain_password=None, sasl_plain_username=None)

An asynchronous Python Kafka client for consuming messages using the "confluent-kafka" package.

Source code in faststream/confluent/client.py
def __init__(
    self,
    *topics: str,
    partitions: Sequence["TopicPartition"],
    logger: Optional["LoggerProto"],
    config: config_module.ConfluentFastConfig,
    bootstrap_servers: Union[str, List[str]] = "localhost",
    client_id: Optional[str] = "confluent-kafka-consumer",
    group_id: Optional[str] = None,
    group_instance_id: Optional[str] = None,
    fetch_max_wait_ms: int = 500,
    fetch_max_bytes: int = 52428800,
    fetch_min_bytes: int = 1,
    max_partition_fetch_bytes: int = 1 * 1024 * 1024,
    retry_backoff_ms: int = 100,
    auto_offset_reset: str = "latest",
    enable_auto_commit: bool = True,
    auto_commit_interval_ms: int = 5000,
    check_crcs: bool = True,
    metadata_max_age_ms: int = 5 * 60 * 1000,
    partition_assignment_strategy: Union[str, List[Any]] = "roundrobin",
    max_poll_interval_ms: int = 300000,
    session_timeout_ms: int = 10000,
    heartbeat_interval_ms: int = 3000,
    security_protocol: str = "PLAINTEXT",
    connections_max_idle_ms: int = 540000,
    isolation_level: str = "read_uncommitted",
    allow_auto_create_topics: bool = True,
    sasl_mechanism: Optional[str] = None,
    sasl_plain_password: Optional[str] = None,
    sasl_plain_username: Optional[str] = None,
) -> None:
    self.logger = logger

    if isinstance(bootstrap_servers, Iterable) and not isinstance(
        bootstrap_servers, str
    ):
        bootstrap_servers = ",".join(bootstrap_servers)

    self.topics = list(topics)
    self.partitions = partitions

    if not isinstance(partition_assignment_strategy, str):
        partition_assignment_strategy = ",".join(
            [
                x if isinstance(x, str) else x().name
                for x in partition_assignment_strategy
            ]
        )

    final_config = config.as_config_dict()

    config_from_params = {
        "allow.auto.create.topics": allow_auto_create_topics,
        "topic.metadata.refresh.interval.ms": 1000,
        "bootstrap.servers": bootstrap_servers,
        "client.id": client_id,
        "group.id": group_id
        or final_config.get("group.id", "faststream-consumer-group"),
        "group.instance.id": group_instance_id
        or final_config.get("group.instance.id", None),
        "fetch.wait.max.ms": fetch_max_wait_ms,
        "fetch.max.bytes": fetch_max_bytes,
        "fetch.min.bytes": fetch_min_bytes,
        "max.partition.fetch.bytes": max_partition_fetch_bytes,
        # "request.timeout.ms": 1000,  # producer only
        "fetch.error.backoff.ms": retry_backoff_ms,
        "auto.offset.reset": auto_offset_reset,
        "enable.auto.commit": enable_auto_commit,
        "auto.commit.interval.ms": auto_commit_interval_ms,
        "check.crcs": check_crcs,
        "metadata.max.age.ms": metadata_max_age_ms,
        "partition.assignment.strategy": partition_assignment_strategy,
        "max.poll.interval.ms": max_poll_interval_ms,
        "session.timeout.ms": session_timeout_ms,
        "heartbeat.interval.ms": heartbeat_interval_ms,
        "security.protocol": security_protocol.lower(),
        "connections.max.idle.ms": connections_max_idle_ms,
        "isolation.level": isolation_level,
    }
    self.allow_auto_create_topics = allow_auto_create_topics
    final_config.update(config_from_params)

    if sasl_mechanism in ["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"]:
        final_config.update(
            {
                "sasl.mechanism": sasl_mechanism,
                "sasl.username": sasl_plain_username,
                "sasl.password": sasl_plain_password,
            }
        )

    self.config = final_config
    self.consumer = Consumer(final_config, logger=self.logger)  # type: ignore[call-arg]

    # We shouldn't read messages and close consumer concurrently
    # https://github.com/airtai/faststream/issues/1904#issuecomment-2506990895
    self._lock = anyio.Lock()

logger instance-attribute #

logger = logger

topics instance-attribute #

topics = list(topics)

partitions instance-attribute #

partitions = partitions

allow_auto_create_topics instance-attribute #

allow_auto_create_topics = allow_auto_create_topics

config instance-attribute #

config = final_config

consumer instance-attribute #

consumer = Consumer(final_config, logger=logger)

topics_to_create property #

topics_to_create

start async #

start()

Starts the Kafka consumer and subscribes to the specified topics.

Source code in faststream/confluent/client.py
async def start(self) -> None:
    """Starts the Kafka consumer and subscribes to the specified topics."""
    if self.allow_auto_create_topics:
        await call_or_await(
            create_topics, self.topics_to_create, self.config, self.logger
        )

    elif self.logger:
        self.logger.log(
            logging.WARNING,
            "Auto create topics is disabled. Make sure the topics exist.",
        )

    if self.topics:
        await call_or_await(self.consumer.subscribe, self.topics)

    elif self.partitions:
        await call_or_await(
            self.consumer.assign, [p.to_confluent() for p in self.partitions]
        )

    else:
        raise SetupError("You must provide either `topics` or `partitions` option.")

commit async #

commit(asynchronous=True)

Commits the offsets of all messages returned by the last poll operation.

Source code in faststream/confluent/client.py
async def commit(self, asynchronous: bool = True) -> None:
    """Commits the offsets of all messages returned by the last poll operation."""
    await call_or_await(self.consumer.commit, asynchronous=asynchronous)

stop async #

stop()

Stops the Kafka consumer and releases all resources.

Source code in faststream/confluent/client.py
async def stop(self) -> None:
    """Stops the Kafka consumer and releases all resources."""
    # NOTE: If we don't explicitly call commit and then close the consumer, the confluent consumer gets stuck.
    # We are doing this to avoid the issue.
    enable_auto_commit = self.config["enable.auto.commit"]
    try:
        if enable_auto_commit:
            await self.commit(asynchronous=False)

    except Exception as e:
        # No offset stored issue is not a problem - https://github.com/confluentinc/confluent-kafka-python/issues/295#issuecomment-355907183
        if "No offset stored" in str(e):
            pass
        elif self.logger:
            self.logger.log(
                logging.ERROR,
                "Consumer closing error occurred.",
                exc_info=e,
            )

    # Wrap calls to async to make method cancelable by timeout
    async with self._lock:
        await call_or_await(self.consumer.close)

getone async #

getone(timeout=0.1)

Consumes a single message from Kafka.

Source code in faststream/confluent/client.py
async def getone(self, timeout: float = 0.1) -> Optional[Message]:
    """Consumes a single message from Kafka."""
    async with self._lock:
        msg = await call_or_await(self.consumer.poll, timeout)
    return check_msg_error(msg)

getmany async #

getmany(timeout=0.1, max_records=10)

Consumes a batch of messages from Kafka and groups them by topic and partition.

Source code in faststream/confluent/client.py
async def getmany(
    self,
    timeout: float = 0.1,
    max_records: Optional[int] = 10,
) -> Tuple[Message, ...]:
    """Consumes a batch of messages from Kafka and groups them by topic and partition."""
    async with self._lock:
        raw_messages: List[Optional[Message]] = await call_or_await(
            self.consumer.consume,  # type: ignore[arg-type]
            num_messages=max_records or 10,
            timeout=timeout,
        )

    return tuple(x for x in map(check_msg_error, raw_messages) if x is not None)

seek async #

seek(topic, partition, offset)

Seeks to the specified offset in the specified topic and partition.

Source code in faststream/confluent/client.py
async def seek(self, topic: str, partition: int, offset: int) -> None:
    """Seeks to the specified offset in the specified topic and partition."""
    topic_partition = TopicPartition(
        topic=topic, partition=partition, offset=offset
    )
    await call_or_await(self.consumer.seek, topic_partition.to_confluent())