Skip to content

build_message

faststream.confluent.test.build_message #

build_message(message: SendableMessage, topic: str, partition: Optional[int] = None, timestamp_ms: Optional[int] = None, key: Optional[bytes] = None, headers: Optional[Dict[str, str]] = None, correlation_id: Optional[str] = None, *, reply_to: str = '') -> MockConfluentMessage

Build a mock confluent_kafka.Message for a sendable message.

PARAMETER DESCRIPTION
message

The sendable message to be encoded.

TYPE: SendableMessage

topic

The Kafka topic for the message.

TYPE: str

partition

The Kafka partition for the message. Defaults to None.

TYPE: Optional[int] DEFAULT: None

timestamp_ms

The message timestamp in milliseconds. Defaults to None.

TYPE: Optional[int] DEFAULT: None

key

The message key. Defaults to None.

TYPE: Optional[bytes] DEFAULT: None

headers

Additional headers for the message. Defaults to None.

TYPE: Optional[Dict[str, str]] DEFAULT: None

correlation_id

The correlation ID for the message. Defaults to None.

TYPE: Optional[str] DEFAULT: None

reply_to

The topic to which responses should be sent. Defaults to "".

TYPE: str DEFAULT: ''

RETURNS DESCRIPTION
MockConfluentMessage

A mock confluent_kafka.Message object.

TYPE: MockConfluentMessage

Source code in faststream/confluent/test.py
def build_message(
    message: SendableMessage,
    topic: str,
    partition: Optional[int] = None,
    timestamp_ms: Optional[int] = None,
    key: Optional[bytes] = None,
    headers: Optional[Dict[str, str]] = None,
    correlation_id: Optional[str] = None,
    *,
    reply_to: str = "",
) -> MockConfluentMessage:
    """Build a mock confluent_kafka.Message for a sendable message.

    Args:
        message (SendableMessage): The sendable message to be encoded.
        topic (str): The Kafka topic for the message.
        partition (Optional[int], optional): The Kafka partition for the message. Defaults to None.
        timestamp_ms (Optional[int], optional): The message timestamp in milliseconds. Defaults to None.
        key (Optional[bytes], optional): The message key. Defaults to None.
        headers (Optional[Dict[str, str]], optional): Additional headers for the message. Defaults to None.
        correlation_id (Optional[str], optional): The correlation ID for the message. Defaults to None.
        reply_to (str, optional): The topic to which responses should be sent. Defaults to "".

    Returns:
        MockConfluentMessage: A mock confluent_kafka.Message object.
    """
    msg, content_type = encode_message(message)
    k = key or b""
    headers = {
        "content-type": content_type or "",
        "correlation_id": correlation_id or str(uuid4()),
        "reply_to": reply_to,
        **(headers or {}),
    }

    # https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Message.timestamp
    return MockConfluentMessage(
        raw_msg=msg,
        topic=topic,
        key=k,
        headers=[(i, j.encode()) for i, j in headers.items()],
        offset=0,
        partition=partition or 0,
        timestamp_type=0 + 1,
        timestamp_ms=timestamp_ms or int(datetime.now().timestamp()),
    )