def build_message(
message: "SendableMessage",
topic: str,
*,
correlation_id: str,
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
key: Optional[bytes] = None,
headers: Optional[Dict[str, str]] = None,
reply_to: str = "",
) -> MockConfluentMessage:
"""Build a mock confluent_kafka.Message for a sendable message."""
msg, content_type = encode_message(message)
k = key or b""
headers = {
"content-type": content_type or "",
"correlation_id": correlation_id,
"reply_to": reply_to,
**(headers or {}),
}
# https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Message.timestamp
return MockConfluentMessage(
raw_msg=msg,
topic=topic,
key=k,
headers=[(i, j.encode()) for i, j in headers.items()],
offset=0,
partition=partition or 0,
timestamp_type=0 + 1,
timestamp_ms=timestamp_ms or int(datetime.now().timestamp()),
)