def build_message(
message: "SendableMessage",
topic: str,
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
key: Optional[bytes] = None,
headers: Optional[Dict[str, str]] = None,
correlation_id: Optional[str] = None,
*,
reply_to: str = "",
) -> "ConsumerRecord":
"""Build a Kafka ConsumerRecord for a sendable message."""
msg, content_type = encode_message(message)
k = key or b""
headers = {
"content-type": content_type or "",
"correlation_id": correlation_id or gen_cor_id(),
**(headers or {}),
}
if reply_to:
headers["reply_to"] = headers.get("reply_to", reply_to)
return ConsumerRecord(
value=msg,
topic=topic,
partition=partition or 0,
timestamp=timestamp_ms or int(datetime.now().timestamp()),
timestamp_type=0,
key=k,
serialized_key_size=len(k),
serialized_value_size=len(msg),
checksum=sum(msg),
offset=0,
headers=[(i, j.encode()) for i, j in headers.items()],
)