Skip to content

AsyncConfluentFastProducer

faststream.confluent.publisher.producer.AsyncConfluentFastProducer #

AsyncConfluentFastProducer(producer, parser, decoder)

Bases: ProducerProto

A class to represent Kafka producer.

Source code in faststream/confluent/publisher/producer.py
def __init__(
    self,
    producer: "AsyncConfluentProducer",
    parser: Optional["CustomCallable"],
    decoder: Optional["CustomCallable"],
) -> None:
    self._producer = producer

    # NOTE: register default parser to be compatible with request
    default = AsyncConfluentParser
    self._parser = resolve_custom_func(parser, default.parse_message)
    self._decoder = resolve_custom_func(decoder, default.decode_message)

publish async #

publish(message, topic, *, key=None, partition=None, timestamp_ms=None, headers=None, correlation_id='', reply_to='', no_confirm=False)

Publish a message to a topic.

Source code in faststream/confluent/publisher/producer.py
@override
async def publish(  # type: ignore[override]
    self,
    message: "SendableMessage",
    topic: str,
    *,
    key: Optional[bytes] = None,
    partition: Optional[int] = None,
    timestamp_ms: Optional[int] = None,
    headers: Optional[Dict[str, str]] = None,
    correlation_id: str = "",
    reply_to: str = "",
    no_confirm: bool = False,
) -> None:
    """Publish a message to a topic."""
    message, content_type = encode_message(message)

    headers_to_send = {
        "content-type": content_type or "",
        "correlation_id": correlation_id,
        **(headers or {}),
    }

    if reply_to:
        headers_to_send["reply_to"] = headers_to_send.get(
            "reply_to",
            reply_to,
        )

    await self._producer.send(
        topic=topic,
        value=message,
        key=key,
        partition=partition,
        timestamp_ms=timestamp_ms,
        headers=[(i, (j or "").encode()) for i, j in headers_to_send.items()],
        no_confirm=no_confirm,
    )

stop async #

stop()
Source code in faststream/confluent/publisher/producer.py
async def stop(self) -> None:
    await self._producer.stop()

publish_batch async #

publish_batch(*msgs, topic, partition=None, timestamp_ms=None, headers=None, reply_to='', correlation_id='', no_confirm=False)

Publish a batch of messages to a topic.

Source code in faststream/confluent/publisher/producer.py
async def publish_batch(
    self,
    *msgs: "SendableMessage",
    topic: str,
    partition: Optional[int] = None,
    timestamp_ms: Optional[int] = None,
    headers: Optional[Dict[str, str]] = None,
    reply_to: str = "",
    correlation_id: str = "",
    no_confirm: bool = False,
) -> None:
    """Publish a batch of messages to a topic."""
    batch = self._producer.create_batch()

    headers_to_send = {"correlation_id": correlation_id, **(headers or {})}

    if reply_to:
        headers_to_send["reply_to"] = headers_to_send.get(
            "reply_to",
            reply_to,
        )

    for msg in msgs:
        message, content_type = encode_message(msg)

        if content_type:
            final_headers = {
                "content-type": content_type,
                **headers_to_send,
            }
        else:
            final_headers = headers_to_send.copy()

        batch.append(
            key=None,
            value=message,
            timestamp=timestamp_ms,
            headers=[(i, j.encode()) for i, j in final_headers.items()],
        )

    await self._producer.send_batch(
        batch,
        topic,
        partition=partition,
        no_confirm=no_confirm,
    )

request async #

request(*args, **kwargs)
Source code in faststream/confluent/publisher/producer.py
@override
async def request(self, *args: Any, **kwargs: Any) -> Optional[Any]:
    raise OperationForbiddenError(
        "Kafka doesn't support `request` method without test client."
    )