Skip to content

BatchBuilder

faststream.confluent.client.BatchBuilder #

BatchBuilder()

A helper class to build a batch of messages to send to Kafka.

Initializes a new BatchBuilder instance.

Source code in faststream/confluent/client.py
def __init__(self) -> None:
    """Initializes a new BatchBuilder instance."""
    self._builder: List[AnyDict] = []

append #

append(
    *, timestamp=None, key=None, value=None, headers=None
)

Appends a message to the batch with optional timestamp, key, value, and headers.

Source code in faststream/confluent/client.py
def append(
    self,
    *,
    timestamp: Optional[int] = None,
    key: Optional[Union[str, bytes]] = None,
    value: Optional[Union[str, bytes]] = None,
    headers: Optional[List[Tuple[str, bytes]]] = None,
) -> None:
    """Appends a message to the batch with optional timestamp, key, value, and headers."""
    if key is None and value is None:
        raise KafkaException(
            KafkaError(40, reason="Both key and value can't be None")
        )

    self._builder.append(
        {
            "timestamp_ms": timestamp or round(time() * 1000),
            "key": key,
            "value": value,
            "headers": headers or [],
        }
    )