Skip to content

AsyncConfluentProducer

faststream.confluent.client.AsyncConfluentProducer #

AsyncConfluentProducer(*, logger, config, bootstrap_servers='localhost', client_id=None, metadata_max_age_ms=300000, request_timeout_ms=40000, acks=EMPTY, compression_type=None, partitioner='consistent_random', max_request_size=1048576, linger_ms=0, retry_backoff_ms=100, security_protocol='PLAINTEXT', connections_max_idle_ms=540000, enable_idempotence=False, transactional_id=None, transaction_timeout_ms=60000, allow_auto_create_topics=True, sasl_mechanism=None, sasl_plain_password=None, sasl_plain_username=None)

An asynchronous Python Kafka client using the "confluent-kafka" package.

Source code in faststream/confluent/client.py
def __init__(
    self,
    *,
    logger: Optional["LoggerProto"],
    config: config_module.ConfluentFastConfig,
    bootstrap_servers: Union[str, List[str]] = "localhost",
    client_id: Optional[str] = None,
    metadata_max_age_ms: int = 300000,
    request_timeout_ms: int = 40000,
    acks: Any = EMPTY,
    compression_type: Optional[str] = None,
    partitioner: str = "consistent_random",
    max_request_size: int = 1048576,
    linger_ms: int = 0,
    retry_backoff_ms: int = 100,
    security_protocol: str = "PLAINTEXT",
    connections_max_idle_ms: int = 540000,
    enable_idempotence: bool = False,
    transactional_id: Optional[Union[str, int]] = None,
    transaction_timeout_ms: int = 60000,
    allow_auto_create_topics: bool = True,
    sasl_mechanism: Optional[str] = None,
    sasl_plain_password: Optional[str] = None,
    sasl_plain_username: Optional[str] = None,
) -> None:
    self.logger = logger

    if isinstance(bootstrap_servers, Iterable) and not isinstance(
        bootstrap_servers, str
    ):
        bootstrap_servers = ",".join(bootstrap_servers)

    if compression_type is None:
        compression_type = "none"

    if acks is EMPTY or acks == "all":
        acks = -1

    config_from_params = {
        # "topic.metadata.refresh.interval.ms": 1000,
        "bootstrap.servers": bootstrap_servers,
        "client.id": client_id,
        "metadata.max.age.ms": metadata_max_age_ms,
        "request.timeout.ms": request_timeout_ms,
        "acks": acks,
        "compression.type": compression_type,
        "partitioner": partitioner,
        "message.max.bytes": max_request_size,
        "linger.ms": linger_ms,
        "enable.idempotence": enable_idempotence,
        "transactional.id": transactional_id,
        "transaction.timeout.ms": transaction_timeout_ms,
        "retry.backoff.ms": retry_backoff_ms,
        "security.protocol": security_protocol.lower(),
        "connections.max.idle.ms": connections_max_idle_ms,
        "allow.auto.create.topics": allow_auto_create_topics,
    }

    final_config = {**config.as_config_dict(), **config_from_params}

    if sasl_mechanism in ["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"]:
        final_config.update(
            {
                "sasl.mechanism": sasl_mechanism,
                "sasl.username": sasl_plain_username,
                "sasl.password": sasl_plain_password,
            }
        )

    self.producer = Producer(final_config, logger=self.logger)  # type: ignore[call-arg]

    self.__running = True
    self._poll_task = asyncio.create_task(self._poll_loop())

logger instance-attribute #

logger = logger

producer instance-attribute #

producer = Producer(final_config, logger=logger)

stop async #

stop()

Stop the Kafka producer and flush remaining messages.

Source code in faststream/confluent/client.py
async def stop(self) -> None:
    """Stop the Kafka producer and flush remaining messages."""
    if self.__running:
        self.__running = False
        if not self._poll_task.done():
            self._poll_task.cancel()
        await call_or_await(self.producer.flush)

send async #

send(topic, value=None, key=None, partition=None, timestamp_ms=None, headers=None, no_confirm=False)

Sends a single message to a Kafka topic.

Source code in faststream/confluent/client.py
async def send(
    self,
    topic: str,
    value: Optional[Union[str, bytes]] = None,
    key: Optional[Union[str, bytes]] = None,
    partition: Optional[int] = None,
    timestamp_ms: Optional[int] = None,
    headers: Optional[List[Tuple[str, Union[str, bytes]]]] = None,
    no_confirm: bool = False,
) -> None:
    """Sends a single message to a Kafka topic."""
    kwargs: _SendKwargs = {
        "value": value,
        "key": key,
        "headers": headers,
    }

    if partition is not None:
        kwargs["partition"] = partition

    if timestamp_ms is not None:
        kwargs["timestamp"] = timestamp_ms

    if not no_confirm:
        result_future: asyncio.Future[Optional[Message]] = asyncio.Future()

        def ack_callback(err: Any, msg: Optional[Message]) -> None:
            if err or (msg is not None and (err := msg.error())):
                result_future.set_exception(KafkaException(err))
            else:
                result_future.set_result(msg)

        kwargs["on_delivery"] = ack_callback

    # should be sync to prevent segfault
    self.producer.produce(topic, **kwargs)

    if not no_confirm:
        await result_future

create_batch #

create_batch()

Creates a batch for sending multiple messages.

Source code in faststream/confluent/client.py
def create_batch(self) -> "BatchBuilder":
    """Creates a batch for sending multiple messages."""
    return BatchBuilder()

send_batch async #

send_batch(batch, topic, *, partition, no_confirm=False)

Sends a batch of messages to a Kafka topic.

Source code in faststream/confluent/client.py
async def send_batch(
    self,
    batch: "BatchBuilder",
    topic: str,
    *,
    partition: Optional[int],
    no_confirm: bool = False,
) -> None:
    """Sends a batch of messages to a Kafka topic."""
    async with anyio.create_task_group() as tg:
        for msg in batch._builder:
            tg.start_soon(
                self.send,
                topic,
                msg["value"],
                msg["key"],
                partition,
                msg["timestamp_ms"],
                msg["headers"],
                no_confirm,
            )

ping async #

ping(timeout=5.0)

Implement ping using list_topics information request.

Source code in faststream/confluent/client.py
async def ping(
    self,
    timeout: Optional[float] = 5.0,
) -> bool:
    """Implement ping using `list_topics` information request."""
    if timeout is None:
        timeout = -1

    try:
        cluster_metadata = await call_or_await(
            self.producer.list_topics,
            timeout=timeout,
        )

        return bool(cluster_metadata)

    except Exception:
        return False