Skip to content

AsyncConfluentConsumer

faststream.confluent.client.AsyncConfluentConsumer #

AsyncConfluentConsumer(*topics: str, loop: Optional[AbstractEventLoop] = None, bootstrap_servers: Union[str, List[str]] = 'localhost', client_id: Optional[str] = 'confluent-kafka-consumer', group_id: Optional[str] = None, group_instance_id: Optional[str] = None, key_deserializer: Optional[Callable[[bytes], bytes]] = None, value_deserializer: Optional[Callable[[bytes], bytes]] = None, fetch_max_wait_ms: int = 500, fetch_max_bytes: int = 52428800, fetch_min_bytes: int = 1, max_partition_fetch_bytes: int = 1 * 1024 * 1024, request_timeout_ms: int = 40 * 1000, retry_backoff_ms: int = 100, auto_offset_reset: str = 'latest', enable_auto_commit: bool = True, auto_commit_interval_ms: int = 5000, check_crcs: bool = True, metadata_max_age_ms: int = 5 * 60 * 1000, partition_assignment_strategy: Union[str, List[Any]] = 'roundrobin', max_poll_interval_ms: int = 300000, rebalance_timeout_ms: Optional[int] = None, session_timeout_ms: int = 10000, heartbeat_interval_ms: int = 3000, consumer_timeout_ms: int = 200, max_poll_records: Optional[int] = None, ssl_context: Optional[SSLContext] = None, security_protocol: str = 'PLAINTEXT', api_version: str = 'auto', exclude_internal_topics: bool = True, connections_max_idle_ms: int = 540000, isolation_level: str = 'read_uncommitted', sasl_mechanism: Optional[str] = None, sasl_plain_password: Optional[str] = None, sasl_plain_username: Optional[str] = None, sasl_kerberos_service_name: str = 'kafka', sasl_kerberos_domain_name: Optional[str] = None, sasl_oauth_token_provider: Optional[str] = None)

An asynchronous Python Kafka client for consuming messages using the "confluent-kafka" package.

Initializes the AsyncConfluentConsumer with the given configuration and subscribes to the specified topics.

PARAMETER DESCRIPTION
topics

One or more topic names to subscribe to.

TYPE: str DEFAULT: ()

loop

The event loop to use for asynchronous operations.

TYPE: Optional[AbstractEventLoop] DEFAULT: None

bootstrap_servers

A list of bootstrap servers for Kafka.

TYPE: Union[str, List[str]] DEFAULT: 'localhost'

client_id

A unique identifier for the client.

TYPE: Optional[str] DEFAULT: 'confluent-kafka-consumer'

group_id

The consumer group ID.

TYPE: Optional[str] DEFAULT: None

group_instance_id

A unique identifier for the consumer instance within a group.

TYPE: Optional[str] DEFAULT: None

key_deserializer

A callable to deserialize the key.

TYPE: Optional[Callable[[bytes], bytes]] DEFAULT: None

value_deserializer

A callable to deserialize the value.

TYPE: Optional[Callable[[bytes], bytes]] DEFAULT: None

fetch_max_wait_ms

The maximum time to block waiting for min.bytes data.

TYPE: int DEFAULT: 500

fetch_max_bytes

The maximum amount of data the server should return for a fetch request.

TYPE: int DEFAULT: 52428800

fetch_min_bytes

The minimum amount of data the server should return for a fetch request.

TYPE: int DEFAULT: 1

max_partition_fetch_bytes

The maximum amount of data per-partition the server will return.

TYPE: int DEFAULT: 1 * 1024 * 1024

request_timeout_ms

The maximum time to wait for a request to complete.

TYPE: int DEFAULT: 40 * 1000

retry_backoff_ms

The time to back off when a retry is needed.

TYPE: int DEFAULT: 100

auto_offset_reset

What to do when there is no initial offset in Kafka or if the current offset does not exist.

TYPE: str DEFAULT: 'latest'

enable_auto_commit

If true, the consumer's offset will be periodically committed in the background.

TYPE: bool DEFAULT: True

auto_commit_interval_ms

The frequency in milliseconds that the consumer offsets are auto-committed to Kafka.

TYPE: int DEFAULT: 5000

check_crcs

Automatically check the CRC32 of the records consumed.

TYPE: bool DEFAULT: True

metadata_max_age_ms

The maximum age of metadata before a refresh is forced.

TYPE: int DEFAULT: 5 * 60 * 1000

partition_assignment_strategy

The name of the partition assignment strategy to use.

TYPE: Union[str, List[Any]] DEFAULT: 'roundrobin'

max_poll_interval_ms

The maximum delay between invocations of poll() when using consumer group management.

TYPE: int DEFAULT: 300000

rebalance_timeout_ms

The maximum time that the group coordinator will wait for each member to rejoin when rebalancing.

TYPE: Optional[int] DEFAULT: None

session_timeout_ms

The timeout used to detect consumer failures when using Kafka's group management facility.

TYPE: int DEFAULT: 10000

heartbeat_interval_ms

The expected time between heartbeats to the group coordinator when using Kafka's group management facilities.

TYPE: int DEFAULT: 3000

consumer_timeout_ms

The maximum time to block in the consumer waiting for a message.

TYPE: int DEFAULT: 200

max_poll_records

The maximum number of records returned in a single call to poll().

TYPE: Optional[int] DEFAULT: None

ssl_context

The SSL context for secure connections.

TYPE: Optional[SSLContext] DEFAULT: None

security_protocol

The security protocol to use.

TYPE: str DEFAULT: 'PLAINTEXT'

api_version

The Kafka API version to use.

TYPE: str DEFAULT: 'auto'

exclude_internal_topics

Whether internal topics (such as offsets) should be excluded from the subscription.

TYPE: bool DEFAULT: True

connections_max_idle_ms

The maximum time a connection can be idle.

TYPE: int DEFAULT: 540000

isolation_level

The isolation level for reading data.

TYPE: str DEFAULT: 'read_uncommitted'

sasl_mechanism

The SASL mechanism to use for authentication.

TYPE: str DEFAULT: None

sasl_plain_password

The password for SASL/PLAIN authentication.

TYPE: Optional[str] DEFAULT: None

sasl_plain_username

The username for SASL/PLAIN authentication.

TYPE: Optional[str] DEFAULT: None

sasl_kerberos_service_name

The Kerberos service name for SASL/GSSAPI.

TYPE: str DEFAULT: 'kafka'

sasl_kerberos_domain_name

The Kerberos domain name for SASL/GSSAPI.

TYPE: Optional[str] DEFAULT: None

sasl_oauth_token_provider

The OAuth token provider for SASL/OAUTHBEARER.

TYPE: Optional[str] DEFAULT: None

RAISES DESCRIPTION
ValueError

If the provided bootstrap_servers is not a string or list of strings.

Source code in faststream/confluent/client.py
def __init__(
    self,
    *topics: str,
    loop: Optional[asyncio.AbstractEventLoop] = None,
    bootstrap_servers: Union[str, List[str]] = "localhost",
    client_id: Optional[str] = "confluent-kafka-consumer",
    group_id: Optional[str] = None,
    group_instance_id: Optional[str] = None,
    key_deserializer: Optional[Callable[[bytes], bytes]] = None,
    value_deserializer: Optional[Callable[[bytes], bytes]] = None,
    fetch_max_wait_ms: int = 500,
    fetch_max_bytes: int = 52428800,
    fetch_min_bytes: int = 1,
    max_partition_fetch_bytes: int = 1 * 1024 * 1024,
    request_timeout_ms: int = 40 * 1000,
    retry_backoff_ms: int = 100,
    auto_offset_reset: str = "latest",
    enable_auto_commit: bool = True,
    auto_commit_interval_ms: int = 5000,
    check_crcs: bool = True,
    metadata_max_age_ms: int = 5 * 60 * 1000,
    partition_assignment_strategy: Union[str, List[Any]] = "roundrobin",
    max_poll_interval_ms: int = 300000,
    rebalance_timeout_ms: Optional[int] = None,
    session_timeout_ms: int = 10000,
    heartbeat_interval_ms: int = 3000,
    consumer_timeout_ms: int = 200,
    max_poll_records: Optional[int] = None,
    ssl_context: Optional[SSLContext] = None,
    security_protocol: str = "PLAINTEXT",
    api_version: str = "auto",
    exclude_internal_topics: bool = True,
    connections_max_idle_ms: int = 540000,
    isolation_level: str = "read_uncommitted",
    sasl_mechanism: Optional[str] = None,
    sasl_plain_password: Optional[str] = None,
    sasl_plain_username: Optional[str] = None,
    sasl_kerberos_service_name: str = "kafka",
    sasl_kerberos_domain_name: Optional[str] = None,
    sasl_oauth_token_provider: Optional[str] = None,
) -> None:
    """Initializes the AsyncConfluentConsumer with the given configuration and subscribes to the specified topics.

    Args:
        topics (str): One or more topic names to subscribe to.
        loop (Optional[asyncio.AbstractEventLoop]): The event loop to use for asynchronous operations.
        bootstrap_servers (Union[str, List[str]]): A list of bootstrap servers for Kafka.
        client_id (Optional[str]): A unique identifier for the client.
        group_id (Optional[str]): The consumer group ID.
        group_instance_id (Optional[str]): A unique identifier for the consumer instance within a group.
        key_deserializer (Optional[Callable[[bytes], bytes]]): A callable to deserialize the key.
        value_deserializer (Optional[Callable[[bytes], bytes]]): A callable to deserialize the value.
        fetch_max_wait_ms (int): The maximum time to block waiting for min.bytes data.
        fetch_max_bytes (int): The maximum amount of data the server should return for a fetch request.
        fetch_min_bytes (int): The minimum amount of data the server should return for a fetch request.
        max_partition_fetch_bytes (int): The maximum amount of data per-partition the server will return.
        request_timeout_ms (int): The maximum time to wait for a request to complete.
        retry_backoff_ms (int): The time to back off when a retry is needed.
        auto_offset_reset (str): What to do when there is no initial offset in Kafka or if the current offset does not exist.
        enable_auto_commit (bool): If true, the consumer's offset will be periodically committed in the background.
        auto_commit_interval_ms (int): The frequency in milliseconds that the consumer offsets are auto-committed to Kafka.
        check_crcs (bool): Automatically check the CRC32 of the records consumed.
        metadata_max_age_ms (int): The maximum age of metadata before a refresh is forced.
        partition_assignment_strategy (Union[str, List[Any]]): The name of the partition assignment strategy to use.
        max_poll_interval_ms (int): The maximum delay between invocations of poll() when using consumer group management.
        rebalance_timeout_ms (Optional[int]): The maximum time that the group coordinator will wait for each member to rejoin when rebalancing.
        session_timeout_ms (int): The timeout used to detect consumer failures when using Kafka's group management facility.
        heartbeat_interval_ms (int): The expected time between heartbeats to the group coordinator when using Kafka's group management facilities.
        consumer_timeout_ms (int): The maximum time to block in the consumer waiting for a message.
        max_poll_records (Optional[int]): The maximum number of records returned in a single call to poll().
        ssl_context (Optional[SSLContext]): The SSL context for secure connections.
        security_protocol (str): The security protocol to use.
        api_version (str): The Kafka API version to use.
        exclude_internal_topics (bool): Whether internal topics (such as offsets) should be excluded from the subscription.
        connections_max_idle_ms (int): The maximum time a connection can be idle.
        isolation_level (str): The isolation level for reading data.
        sasl_mechanism (str): The SASL mechanism to use for authentication.
        sasl_plain_password (Optional[str]): The password for SASL/PLAIN authentication.
        sasl_plain_username (Optional[str]): The username for SASL/PLAIN authentication.
        sasl_kerberos_service_name (str): The Kerberos service name for SASL/GSSAPI.
        sasl_kerberos_domain_name (Optional[str]): The Kerberos domain name for SASL/GSSAPI.
        sasl_oauth_token_provider (Optional[str]): The OAuth token provider for SASL/OAUTHBEARER.

    Raises:
        ValueError: If the provided bootstrap_servers is not a string or list of strings.
    """
    if group_id is None:
        group_id = "confluent-kafka-consumer-group"
    if isinstance(bootstrap_servers, Iterable) and not isinstance(
        bootstrap_servers, str
    ):
        bootstrap_servers = ",".join(bootstrap_servers)
    self.topics = list(topics)
    if not isinstance(partition_assignment_strategy, str):
        partition_assignment_strategy = ",".join(
            [
                x if isinstance(x, str) else x().name
                for x in partition_assignment_strategy
            ]
        )
    self.config = {
        "allow.auto.create.topics": True,
        # "topic.metadata.refresh.interval.ms": 1000,
        "bootstrap.servers": bootstrap_servers,
        "client.id": client_id,
        "group.id": group_id,
        "group.instance.id": group_instance_id,
        "fetch.wait.max.ms": fetch_max_wait_ms,
        "fetch.max.bytes": fetch_max_bytes,
        "fetch.min.bytes": fetch_min_bytes,
        "max.partition.fetch.bytes": max_partition_fetch_bytes,
        # "request.timeout.ms": request_timeout_ms,
        "fetch.error.backoff.ms": retry_backoff_ms,
        "auto.offset.reset": auto_offset_reset,
        "enable.auto.commit": enable_auto_commit,
        "auto.commit.interval.ms": auto_commit_interval_ms,
        "check.crcs": check_crcs,
        "metadata.max.age.ms": metadata_max_age_ms,
        "partition.assignment.strategy": partition_assignment_strategy,
        "max.poll.interval.ms": max_poll_interval_ms,
        "session.timeout.ms": session_timeout_ms,
        "heartbeat.interval.ms": heartbeat_interval_ms,
        "security.protocol": security_protocol.lower(),
        "connections.max.idle.ms": connections_max_idle_ms,
        "isolation.level": isolation_level,
        "sasl.kerberos.service.name": sasl_kerberos_service_name,
    }
    if sasl_mechanism:
        self.config.update(
            {
                "sasl.mechanism": sasl_mechanism,
                "sasl.username": sasl_plain_username,
                "sasl.password": sasl_plain_password,
            }
        )

    self.loop = loop or asyncio.get_event_loop()

    create_topics(topics=self.topics, config=self.config)
    self.consumer = Consumer(self.config)

config instance-attribute #

config = {'allow.auto.create.topics': True, 'bootstrap.servers': bootstrap_servers, 'client.id': client_id, 'group.id': group_id, 'group.instance.id': group_instance_id, 'fetch.wait.max.ms': fetch_max_wait_ms, 'fetch.max.bytes': fetch_max_bytes, 'fetch.min.bytes': fetch_min_bytes, 'max.partition.fetch.bytes': max_partition_fetch_bytes, 'fetch.error.backoff.ms': retry_backoff_ms, 'auto.offset.reset': auto_offset_reset, 'enable.auto.commit': enable_auto_commit, 'auto.commit.interval.ms': auto_commit_interval_ms, 'check.crcs': check_crcs, 'metadata.max.age.ms': metadata_max_age_ms, 'partition.assignment.strategy': partition_assignment_strategy, 'max.poll.interval.ms': max_poll_interval_ms, 'session.timeout.ms': session_timeout_ms, 'heartbeat.interval.ms': heartbeat_interval_ms, 'security.protocol': lower(), 'connections.max.idle.ms': connections_max_idle_ms, 'isolation.level': isolation_level, 'sasl.kerberos.service.name': sasl_kerberos_service_name}

consumer instance-attribute #

consumer = Consumer(config)

loop instance-attribute #

loop = loop or get_event_loop()

topics instance-attribute #

topics = list(topics)

commit async #

commit() -> None

Commits the offsets of all messages returned by the last poll operation.

Source code in faststream/confluent/client.py
async def commit(self) -> None:
    """Commits the offsets of all messages returned by the last poll operation."""
    await call_or_await(self.consumer.commit)

getmany async #

getmany(timeout_ms: int = 0, max_records: Optional[int] = 10) -> Dict[TopicPartition, List[Message]]

Consumes a batch of messages from Kafka and groups them by topic and partition.

PARAMETER DESCRIPTION
timeout_ms

The timeout in milliseconds to wait for messages.

TYPE: int DEFAULT: 0

max_records

The maximum number of messages to return.

TYPE: Optional[int] DEFAULT: 10

RETURNS DESCRIPTION
Dict[TopicPartition, List[Message]]

Dict[TopicPartition, List[Message]]: A dictionary where keys are TopicPartition named tuples and values are lists of messages.

Source code in faststream/confluent/client.py
async def getmany(
    self,
    timeout_ms: int = 0,
    max_records: Optional[int] = 10,
) -> Dict[TopicPartition, List[Message]]:
    """Consumes a batch of messages from Kafka and groups them by topic and partition.

    Args:
        timeout_ms (int): The timeout in milliseconds to wait for messages.
        max_records (Optional[int]): The maximum number of messages to return.

    Returns:
        Dict[TopicPartition, List[Message]]: A dictionary where keys are TopicPartition named tuples and values are lists of messages.
    """
    raw_messages: List[Optional[Message]] = await call_or_await(
        self.consumer.consume,
        num_messages=max_records or 10,
        timeout=timeout_ms / 1000,
    )

    validated_messages: Iterable[Message] = filter(
        lambda x: x is not None,
        map(check_msg_error, raw_messages),
    )

    messages: DefaultDict[TopicPartition, List[Message]] = defaultdict(list)
    for record in validated_messages:
        tp = TopicPartition(topic=record.topic(), partition=record.partition())  # type: ignore[arg-type]
        messages[tp].append(record)

    return messages

getone async #

getone(timeout_ms: int = 1000) -> Message

Consumes a single message from Kafka.

RETURNS DESCRIPTION
Message

The consumed message.

TYPE: Message

Source code in faststream/confluent/client.py
async def getone(self, timeout_ms: int = 1000) -> Message:
    """Consumes a single message from Kafka.

    Returns:
        Message: The consumed message.
    """
    while True:
        timeout = timeout_ms / 1000
        msg = await call_or_await(self.consumer.poll, timeout)
        if (record := check_msg_error(msg)) is not None:
            return record

start async #

start() -> None

Starts the Kafka consumer and subscribes to the specified topics.

Source code in faststream/confluent/client.py
async def start(self) -> None:
    """Starts the Kafka consumer and subscribes to the specified topics."""
    # create_topics(topics=self.topics, config=self.config)
    # await call_or_await(create_topics)(topics=self.topics, config=self.config)
    await call_or_await(self.consumer.subscribe, self.topics)

stop async #

stop() -> None

Stops the Kafka consumer and releases all resources.

Source code in faststream/confluent/client.py
async def stop(self) -> None:
    """Stops the Kafka consumer and releases all resources."""
    enable_auto_commit = self.config["enable.auto.commit"]
    try:
        if enable_auto_commit:
            await call_or_await(self.consumer.commit, asynchronous=False)
    except Exception as e:
        # No offset stored issue is not a problem - https://github.com/confluentinc/confluent-kafka-python/issues/295#issuecomment-355907183
        if "No offset stored" in str(e):
            pass
        else:
            raise e

    await call_or_await(self.consumer.close)