build_message
faststream.nats.test.build_message
build_message(
message: SendableMessage,
subject: str,
*,
reply_to: str = "",
correlation_id: Optional[str] = None,
headers: Optional[AnyDict] = None
) -> PatchedMessage
Source code in faststream/nats/test.py
| def build_message(
message: SendableMessage,
subject: str,
*,
reply_to: str = "",
correlation_id: Optional[str] = None,
headers: Optional[AnyDict] = None,
) -> "PatchedMessage":
msg, content_type = encode_message(message)
return PatchedMessage(
_client=None, # type: ignore
subject=subject,
reply=reply_to,
data=msg,
headers={
"content-type": content_type or "",
"correlation_id": correlation_id or str(uuid4()),
**(headers or {}),
},
)
|
Last update: 1 year ago2023-11-13