Skip to content

RawMessage

faststream.redis.parser.RawMessage #

Bases: BaseModel

A class to represent a raw Redis message.

data instance-attribute #

data: bytes

headers class-attribute instance-attribute #

headers: AnyDict = Field(default_factory=dict)

build classmethod #

build(message: Union[Sequence[SendableMessage], SendableMessage], reply_to: str = '', headers: Optional[AnyDict] = None, correlation_id: Optional[str] = None) -> RawMessage
Source code in faststream/redis/parser.py
@classmethod
def build(
    cls,
    message: Union[Sequence[SendableMessage], SendableMessage],
    reply_to: str = "",
    headers: Optional[AnyDict] = None,
    correlation_id: Optional[str] = None,
) -> "RawMessage":
    payload, content_type = encode_message(message)

    headers_to_send = {
        "correlation_id": correlation_id or str(uuid4()),
    }

    if content_type:
        headers_to_send["content-type"] = content_type

    if reply_to:
        headers_to_send["reply_to"] = reply_to

    if headers is not None:
        headers_to_send.update(headers)

    return cls(
        data=payload,
        headers=headers_to_send,
    )

encode classmethod #

encode(message: Union[Sequence[SendableMessage], SendableMessage], reply_to: str = '', headers: Optional[AnyDict] = None, correlation_id: Optional[str] = None) -> str
Source code in faststream/redis/parser.py
@classmethod
def encode(
    cls,
    message: Union[Sequence[SendableMessage], SendableMessage],
    reply_to: str = "",
    headers: Optional[AnyDict] = None,
    correlation_id: Optional[str] = None,
) -> str:
    return model_to_json(
        cls.build(
            message=message,
            reply_to=reply_to,
            headers=headers,
            correlation_id=correlation_id,
        )
    )