Skip to content

BatchBuilder

faststream.confluent.client.BatchBuilder #

BatchBuilder()

A helper class to build a batch of messages to send to Kafka.

Initializes a new BatchBuilder instance.

Source code in faststream/confluent/client.py
def __init__(self) -> None:
    """Initializes a new BatchBuilder instance."""
    self._builder: List[MsgToSend] = []

append #

append(*, timestamp: Optional[int] = None, key: Optional[Union[str, bytes]] = None, value: Optional[Union[str, bytes]] = None, headers: Optional[List[Tuple[str, bytes]]] = None) -> None

Appends a message to the batch with optional timestamp, key, value, and headers.

PARAMETER DESCRIPTION
timestamp

The timestamp of the message. If None, current time is used.

TYPE: Optional[int] DEFAULT: None

key

The key of the message.

TYPE: Optional[Union[str, bytes]] DEFAULT: None

value

The value of the message.

TYPE: Optional[Union[str, bytes]] DEFAULT: None

headers

A list of headers for the message.

TYPE: Optional[List[Tuple[str, bytes]]] DEFAULT: None

RAISES DESCRIPTION
KafkaException

If both key and value are None.

Source code in faststream/confluent/client.py
def append(
    self,
    *,
    timestamp: Optional[int] = None,
    key: Optional[Union[str, bytes]] = None,
    value: Optional[Union[str, bytes]] = None,
    headers: Optional[List[Tuple[str, bytes]]] = None,
) -> None:
    """Appends a message to the batch with optional timestamp, key, value, and headers.

    Args:
        timestamp (Optional[int]): The timestamp of the message. If None, current time is used.
        key (Optional[Union[str, bytes]]): The key of the message.
        value (Optional[Union[str, bytes]]): The value of the message.
        headers (Optional[List[Tuple[str, bytes]]]): A list of headers for the message.

    Raises:
        KafkaException: If both key and value are None.
    """
    if timestamp is None:
        timestamp = round(time() * 1000)
    if key is None and value is None:
        raise KafkaException(
            KafkaError(40, reason="Both key and value can't be None")
        )
    if headers is None:
        headers = []
    self._builder.append(
        MsgToSend(timestamp=timestamp, key=key, value=value, headers=headers)
    )