Skip to content

build_message

faststream.redis.test.build_message #

build_message(message: Union[Sequence[SendableMessage], SendableMessage], channel: str, *, reply_to: str = '', correlation_id: Optional[str] = None, headers: Optional[AnyDict] = None) -> AnyRedisDict
Source code in faststream/redis/test.py
def build_message(
    message: Union[Sequence[SendableMessage], SendableMessage],
    channel: str,
    *,
    reply_to: str = "",
    correlation_id: Optional[str] = None,
    headers: Optional[AnyDict] = None,
) -> AnyRedisDict:
    data = RawMessage.encode(
        message=message,
        reply_to=reply_to,
        headers=headers,
        correlation_id=correlation_id,
    )
    return AnyRedisDict(
        channel=channel.encode(),
        data=data.encode(),
        type="message",
    )