Skip to content

AsyncConfluentProducer

faststream.confluent.client.AsyncConfluentProducer #

AsyncConfluentProducer(*, loop: Optional[AbstractEventLoop] = None, bootstrap_servers: Union[str, List[str]] = 'localhost', client_id: Optional[str] = None, metadata_max_age_ms: int = 300000, request_timeout_ms: int = 40000, api_version: str = 'auto', acks: Any = _missing, key_serializer: Optional[Callable[[bytes], bytes]] = None, value_serializer: Optional[Callable[[bytes], bytes]] = None, compression_type: Optional[str] = None, max_batch_size: int = 16384, partitioner: str = 'consistent_random', max_request_size: int = 1048576, linger_ms: int = 0, send_backoff_ms: int = 100, retry_backoff_ms: int = 100, security_protocol: str = 'PLAINTEXT', ssl_context: Optional[SSLContext] = None, connections_max_idle_ms: int = 540000, enable_idempotence: bool = False, transactional_id: Optional[Union[str, int]] = None, transaction_timeout_ms: int = 60000, sasl_mechanism: Optional[str] = None, sasl_plain_password: Optional[str] = None, sasl_plain_username: Optional[str] = None, sasl_kerberos_service_name: str = 'kafka', sasl_kerberos_domain_name: Optional[str] = None, sasl_oauth_token_provider: Optional[str] = None)

An asynchronous Python Kafka client using the "confluent-kafka" package.

Initializes the AsyncConfluentProducer with the given configuration.

PARAMETER DESCRIPTION
loop

The event loop to use for asynchronous operations.

TYPE: Optional[AbstractEventLoop] DEFAULT: None

bootstrap_servers

A list of bootstrap servers for Kafka.

TYPE: Union[str, List[str]] DEFAULT: 'localhost'

client_id

A unique identifier for the client.

TYPE: Optional[str] DEFAULT: None

metadata_max_age_ms

The maximum age of metadata before a refresh is forced.

TYPE: int DEFAULT: 300000

request_timeout_ms

The maximum time to wait for a request to complete.

TYPE: int DEFAULT: 40000

api_version

The Kafka API version to use.

TYPE: str DEFAULT: 'auto'

acks

The number of acknowledgments the producer requires before considering a request complete.

TYPE: Any DEFAULT: _missing

key_serializer

A callable to serialize the key.

TYPE: Optional[Callable[[bytes], bytes]] DEFAULT: None

value_serializer

A callable to serialize the value.

TYPE: Optional[Callable[[bytes], bytes]] DEFAULT: None

compression_type

The compression type for message batches.

TYPE: Optional[str] DEFAULT: None

max_batch_size

The maximum size of a message batch.

TYPE: int DEFAULT: 16384

partitioner

The partitioning strategy to use when sending messages.

TYPE: str DEFAULT: 'consistent_random'

max_request_size

The maximum size of a request in bytes.

TYPE: int DEFAULT: 1048576

linger_ms

The time to wait before sending a batch in milliseconds.

TYPE: int DEFAULT: 0

send_backoff_ms

The time to back off when sending fails.

TYPE: int DEFAULT: 100

retry_backoff_ms

The time to back off when a retry is needed.

TYPE: int DEFAULT: 100

security_protocol

The security protocol to use.

TYPE: str DEFAULT: 'PLAINTEXT'

ssl_context

The SSL context for secure connections.

TYPE: Optional[SSLContext] DEFAULT: None

connections_max_idle_ms

The maximum time a connection can be idle.

TYPE: int DEFAULT: 540000

enable_idempotence

Whether to enable idempotent producer capabilities.

TYPE: bool DEFAULT: False

transactional_id

The transactional ID for transactional delivery.

TYPE: Optional[Union[str, int]] DEFAULT: None

transaction_timeout_ms

The maximum time allowed for transactions.

TYPE: int DEFAULT: 60000

sasl_mechanism

The SASL mechanism to use for authentication.

TYPE: str DEFAULT: None

sasl_plain_password

The password for SASL/PLAIN authentication.

TYPE: Optional[str] DEFAULT: None

sasl_plain_username

The username for SASL/PLAIN authentication.

TYPE: Optional[str] DEFAULT: None

sasl_kerberos_service_name

The Kerberos service name for SASL/GSSAPI.

TYPE: str DEFAULT: 'kafka'

sasl_kerberos_domain_name

The Kerberos domain name for SASL/GSSAPI.

TYPE: Optional[str] DEFAULT: None

sasl_oauth_token_provider

The OAuth token provider for SASL/OAUTHBEARER.

TYPE: Optional[str] DEFAULT: None

RAISES DESCRIPTION
ValueError

If the provided bootstrap_servers is not a string or list of strings.

Source code in faststream/confluent/client.py
def __init__(
    self,
    *,
    loop: Optional[asyncio.AbstractEventLoop] = None,
    bootstrap_servers: Union[str, List[str]] = "localhost",
    client_id: Optional[str] = None,
    metadata_max_age_ms: int = 300000,
    request_timeout_ms: int = 40000,
    api_version: str = "auto",
    acks: Any = _missing,
    key_serializer: Optional[Callable[[bytes], bytes]] = None,
    value_serializer: Optional[Callable[[bytes], bytes]] = None,
    compression_type: Optional[str] = None,
    max_batch_size: int = 16384,
    partitioner: str = "consistent_random",
    max_request_size: int = 1048576,
    linger_ms: int = 0,
    send_backoff_ms: int = 100,
    retry_backoff_ms: int = 100,
    security_protocol: str = "PLAINTEXT",
    ssl_context: Optional[SSLContext] = None,
    connections_max_idle_ms: int = 540000,
    enable_idempotence: bool = False,
    transactional_id: Optional[Union[str, int]] = None,
    transaction_timeout_ms: int = 60000,
    sasl_mechanism: Optional[str] = None,
    sasl_plain_password: Optional[str] = None,
    sasl_plain_username: Optional[str] = None,
    sasl_kerberos_service_name: str = "kafka",
    sasl_kerberos_domain_name: Optional[str] = None,
    sasl_oauth_token_provider: Optional[str] = None,
) -> None:
    """Initializes the AsyncConfluentProducer with the given configuration.

    Args:
        loop (Optional[asyncio.AbstractEventLoop]): The event loop to use for asynchronous operations.
        bootstrap_servers (Union[str, List[str]]): A list of bootstrap servers for Kafka.
        client_id (Optional[str]): A unique identifier for the client.
        metadata_max_age_ms (int): The maximum age of metadata before a refresh is forced.
        request_timeout_ms (int): The maximum time to wait for a request to complete.
        api_version (str): The Kafka API version to use.
        acks (Any): The number of acknowledgments the producer requires before considering a request complete.
        key_serializer (Optional[Callable[[bytes], bytes]]): A callable to serialize the key.
        value_serializer (Optional[Callable[[bytes], bytes]]): A callable to serialize the value.
        compression_type (Optional[str]): The compression type for message batches.
        max_batch_size (int): The maximum size of a message batch.
        partitioner (str): The partitioning strategy to use when sending messages.
        max_request_size (int): The maximum size of a request in bytes.
        linger_ms (int): The time to wait before sending a batch in milliseconds.
        send_backoff_ms (int): The time to back off when sending fails.
        retry_backoff_ms (int): The time to back off when a retry is needed.
        security_protocol (str): The security protocol to use.
        ssl_context (Optional[SSLContext]): The SSL context for secure connections.
        connections_max_idle_ms (int): The maximum time a connection can be idle.
        enable_idempotence (bool): Whether to enable idempotent producer capabilities.
        transactional_id (Optional[Union[str, int]]): The transactional ID for transactional delivery.
        transaction_timeout_ms (int): The maximum time allowed for transactions.
        sasl_mechanism (str): The SASL mechanism to use for authentication.
        sasl_plain_password (Optional[str]): The password for SASL/PLAIN authentication.
        sasl_plain_username (Optional[str]): The username for SASL/PLAIN authentication.
        sasl_kerberos_service_name (str): The Kerberos service name for SASL/GSSAPI.
        sasl_kerberos_domain_name (Optional[str]): The Kerberos domain name for SASL/GSSAPI.
        sasl_oauth_token_provider (Optional[str]): The OAuth token provider for SASL/OAUTHBEARER.

    Raises:
        ValueError: If the provided bootstrap_servers is not a string or list of strings.
    """
    if isinstance(bootstrap_servers, Iterable) and not isinstance(
        bootstrap_servers, str
    ):
        bootstrap_servers = ",".join(bootstrap_servers)

    if compression_type is None:
        compression_type = "none"

    if acks is _missing or acks == "all":
        acks = -1

    self.config = {
        # "topic.metadata.refresh.interval.ms": 1000,
        "bootstrap.servers": bootstrap_servers,
        "client.id": client_id,
        "metadata.max.age.ms": metadata_max_age_ms,
        "request.timeout.ms": request_timeout_ms,
        "acks": acks,
        "compression.type": compression_type,
        "partitioner": partitioner,
        "message.max.bytes": max_request_size,
        "linger.ms": linger_ms,
        "enable.idempotence": enable_idempotence,
        "transactional.id": transactional_id,
        "transaction.timeout.ms": transaction_timeout_ms,
        "retry.backoff.ms": retry_backoff_ms,
        "security.protocol": security_protocol.lower(),
        "connections.max.idle.ms": connections_max_idle_ms,
        "sasl.kerberos.service.name": sasl_kerberos_service_name,
    }
    if sasl_mechanism:
        self.config.update(
            {
                "sasl.mechanism": sasl_mechanism,
                "sasl.username": sasl_plain_username,
                "sasl.password": sasl_plain_password,
            }
        )

    self.producer = Producer(self.config)
    # self.producer.init_transactions()
    self.producer.list_topics()
    self.loop = loop or asyncio.get_event_loop()

config instance-attribute #

config = {'bootstrap.servers': bootstrap_servers, 'client.id': client_id, 'metadata.max.age.ms': metadata_max_age_ms, 'request.timeout.ms': request_timeout_ms, 'acks': acks, 'compression.type': compression_type, 'partitioner': partitioner, 'message.max.bytes': max_request_size, 'linger.ms': linger_ms, 'enable.idempotence': enable_idempotence, 'transactional.id': transactional_id, 'transaction.timeout.ms': transaction_timeout_ms, 'retry.backoff.ms': retry_backoff_ms, 'security.protocol': lower(), 'connections.max.idle.ms': connections_max_idle_ms, 'sasl.kerberos.service.name': sasl_kerberos_service_name}

loop instance-attribute #

loop = loop or get_event_loop()

producer instance-attribute #

producer = Producer(config)

create_batch #

create_batch() -> BatchBuilder

Creates a batch for sending multiple messages.

RETURNS DESCRIPTION
BatchBuilder

An instance of BatchBuilder for building message batches.

TYPE: BatchBuilder

Source code in faststream/confluent/client.py
def create_batch(self) -> BatchBuilder:
    """Creates a batch for sending multiple messages.

    Returns:
        BatchBuilder: An instance of BatchBuilder for building message batches.
    """
    return BatchBuilder()

send async #

send(topic: str, value: Optional[Union[str, bytes]] = None, key: Optional[Union[str, bytes]] = None, partition: Optional[int] = None, timestamp_ms: Optional[int] = None, headers: Optional[List[Tuple[str, Union[str, bytes]]]] = None) -> None

Sends a single message to a Kafka topic.

PARAMETER DESCRIPTION
topic

The topic to send the message to.

TYPE: str

value

The message value.

TYPE: Optional[Union[str, bytes]] DEFAULT: None

key

The message key.

TYPE: Optional[Union[str, bytes]] DEFAULT: None

partition

The partition to send the message to.

TYPE: Optional[int] DEFAULT: None

timestamp_ms

The timestamp of the message in milliseconds.

TYPE: Optional[int] DEFAULT: None

headers

A list of headers for the message.

TYPE: Optional[List[Tuple[str, Union[str, bytes]]]] DEFAULT: None

Source code in faststream/confluent/client.py
async def send(
    self,
    topic: str,
    value: Optional[Union[str, bytes]] = None,
    key: Optional[Union[str, bytes]] = None,
    partition: Optional[int] = None,
    timestamp_ms: Optional[int] = None,
    headers: Optional[List[Tuple[str, Union[str, bytes]]]] = None,
) -> None:
    """Sends a single message to a Kafka topic.

    Args:
        topic (str): The topic to send the message to.
        value (Optional[Union[str, bytes]]): The message value.
        key (Optional[Union[str, bytes]]): The message key.
        partition (Optional[int]): The partition to send the message to.
        timestamp_ms (Optional[int]): The timestamp of the message in milliseconds.
        headers (Optional[List[Tuple[str, Union[str, bytes]]]]): A list of headers for the message.
    """
    d = locals()
    d.pop("topic")
    d.pop("timestamp_ms")
    d.pop("self")
    kwargs = {k: v for k, v in d.items() if v is not None}
    if timestamp_ms is not None:
        kwargs["timestamp"] = timestamp_ms

    # result = self.loop.create_future()
    # def ack(err, msg):
    #     print("At msg on_delivery callback")
    #     if err:
    #         print("Error at ack")
    #         self.loop.call_soon_threadsafe(result.set_exception, KafkaException(err))
    #     else:
    #         print("All good at ack")
    #         self.loop.call_soon_threadsafe(result.set_result, msg)

    self.producer.produce(
        topic,
        # on_delivery=ack,
        **kwargs,
    )
    self.producer.poll(0)

send_batch async #

send_batch(batch: BatchBuilder, topic: str, *, partition: Optional[int]) -> None

Sends a batch of messages to a Kafka topic.

PARAMETER DESCRIPTION
batch

The batch of messages to send.

TYPE: BatchBuilder

topic

The topic to send the messages to.

TYPE: str

partition

The partition to send the messages to.

TYPE: Optional[int]

Source code in faststream/confluent/client.py
async def send_batch(
    self, batch: BatchBuilder, topic: str, *, partition: Optional[int]
) -> None:
    """Sends a batch of messages to a Kafka topic.

    Args:
        batch (BatchBuilder): The batch of messages to send.
        topic (str): The topic to send the messages to.
        partition (Optional[int]): The partition to send the messages to.
    """
    tasks = [
        self.send(
            topic=topic,
            partition=partition,
            timestamp_ms=msg.timestamp,
            key=msg.key,
            value=msg.value,
            headers=msg.headers,  # type: ignore[arg-type]
        )
        for msg in batch._builder
    ]
    await asyncio.gather(*tasks)

start async #

start() -> None

Start the Kafka producer.

Source code in faststream/confluent/client.py
async def start(self) -> None:
    """Start the Kafka producer."""
    pass

stop async #

stop() -> None

Stop the Kafka producer and flush remaining messages.

Source code in faststream/confluent/client.py
async def stop(self) -> None:
    """Stop the Kafka producer and flush remaining messages."""
    self.producer.flush()