Skip to content

build_message

faststream.nats.test.build_message #

build_message(message: SendableMessage, subject: str, *, reply_to: str = '', correlation_id: Optional[str] = None, headers: Optional[AnyDict] = None) -> PatchedMessage
Source code in faststream/nats/test.py
def build_message(
    message: SendableMessage,
    subject: str,
    *,
    reply_to: str = "",
    correlation_id: Optional[str] = None,
    headers: Optional[AnyDict] = None,
) -> "PatchedMessage":
    msg, content_type = encode_message(message)
    return PatchedMessage(
        _client=None,  # type: ignore
        subject=subject,
        reply=reply_to,
        data=msg,
        headers={
            "content-type": content_type or "",
            "correlation_id": correlation_id or str(uuid4()),
            **(headers or {}),
        },
    )