Skip to content

build_message

faststream.kafka.test.build_message #

build_message(
    message: SendableMessage,
    topic: str,
    partition: Optional[int] = None,
    timestamp_ms: Optional[int] = None,
    key: Optional[bytes] = None,
    headers: Optional[Dict[str, str]] = None,
    correlation_id: Optional[str] = None,
    *,
    reply_to: str = ""
) -> ConsumerRecord

Build a Kafka ConsumerRecord for a sendable message.

PARAMETER DESCRIPTION
message

The sendable message to be encoded.

TYPE: SendableMessage

topic

The Kafka topic for the message.

TYPE: str

partition

The Kafka partition for the message. Defaults to None.

TYPE: Optional[int] DEFAULT: None

timestamp_ms

The message timestamp in milliseconds. Defaults to None.

TYPE: Optional[int] DEFAULT: None

key

The message key. Defaults to None.

TYPE: Optional[bytes] DEFAULT: None

headers

Additional headers for the message. Defaults to None.

TYPE: Optional[Dict[str, str]] DEFAULT: None

correlation_id

The correlation ID for the message. Defaults to None.

TYPE: Optional[str] DEFAULT: None

reply_to

The topic to which responses should be sent. Defaults to "".

TYPE: str DEFAULT: ''

RETURNS DESCRIPTION
ConsumerRecord

A Kafka ConsumerRecord object.

TYPE: ConsumerRecord

Source code in faststream/kafka/test.py
def build_message(
    message: SendableMessage,
    topic: str,
    partition: Optional[int] = None,
    timestamp_ms: Optional[int] = None,
    key: Optional[bytes] = None,
    headers: Optional[Dict[str, str]] = None,
    correlation_id: Optional[str] = None,
    *,
    reply_to: str = "",
) -> ConsumerRecord:
    """Build a Kafka ConsumerRecord for a sendable message.

    Args:
        message (SendableMessage): The sendable message to be encoded.
        topic (str): The Kafka topic for the message.
        partition (Optional[int], optional): The Kafka partition for the message. Defaults to None.
        timestamp_ms (Optional[int], optional): The message timestamp in milliseconds. Defaults to None.
        key (Optional[bytes], optional): The message key. Defaults to None.
        headers (Optional[Dict[str, str]], optional): Additional headers for the message. Defaults to None.
        correlation_id (Optional[str], optional): The correlation ID for the message. Defaults to None.
        reply_to (str, optional): The topic to which responses should be sent. Defaults to "".

    Returns:
        ConsumerRecord: A Kafka ConsumerRecord object.
    """
    msg, content_type = encode_message(message)
    k = key or b""
    headers = {
        "content-type": content_type or "",
        "correlation_id": correlation_id or str(uuid4()),
        "reply_to": reply_to,
        **(headers or {}),
    }

    return ConsumerRecord(
        value=msg,
        topic=topic,
        partition=partition or 0,
        timestamp=timestamp_ms or int(datetime.now().timestamp()),
        timestamp_type=0,
        key=k,
        serialized_key_size=len(k),
        serialized_value_size=len(msg),
        checksum=sum(msg),
        offset=0,
        headers=[(i, j.encode()) for i, j in headers.items()],
    )