Skip to content

build_message

faststream.kafka.testing.build_message #

build_message(message, topic, partition=None, timestamp_ms=None, key=None, headers=None, correlation_id=None, *, reply_to='')

Build a Kafka ConsumerRecord for a sendable message.

Source code in faststream/kafka/testing.py
def build_message(
    message: "SendableMessage",
    topic: str,
    partition: Optional[int] = None,
    timestamp_ms: Optional[int] = None,
    key: Optional[bytes] = None,
    headers: Optional[Dict[str, str]] = None,
    correlation_id: Optional[str] = None,
    *,
    reply_to: str = "",
) -> "ConsumerRecord":
    """Build a Kafka ConsumerRecord for a sendable message."""
    msg, content_type = encode_message(message)

    k = key or b""

    headers = {
        "content-type": content_type or "",
        "correlation_id": correlation_id or gen_cor_id(),
        **(headers or {}),
    }

    if reply_to:
        headers["reply_to"] = headers.get("reply_to", reply_to)

    return ConsumerRecord(
        value=msg,
        topic=topic,
        partition=partition or 0,
        timestamp=timestamp_ms or int(datetime.now().timestamp()),
        timestamp_type=0,
        key=k,
        serialized_key_size=len(k),
        serialized_value_size=len(msg),
        checksum=sum(msg),
        offset=0,
        headers=[(i, j.encode()) for i, j in headers.items()],
    )