Skip to content

build_message

faststream.redis.testing.build_message #

build_message(message, *, correlation_id, reply_to='', headers=None)
Source code in faststream/redis/testing.py
def build_message(
    message: Union[Sequence["SendableMessage"], "SendableMessage"],
    *,
    correlation_id: str,
    reply_to: str = "",
    headers: Optional["AnyDict"] = None,
) -> bytes:
    data = RawMessage.encode(
        message=message,
        reply_to=reply_to,
        headers=headers,
        correlation_id=correlation_id,
    )
    return data