A class to represent a raw Redis message.
Source code in faststream/redis/parser.py
| def __init__(
self,
data: bytes,
headers: Optional["AnyDict"] = None,
) -> None:
self.data = data
self.headers = headers or {}
|
build classmethod
build(*, message, reply_to, headers, correlation_id)
Source code in faststream/redis/parser.py
| @classmethod
def build(
cls,
*,
message: Union[Sequence["SendableMessage"], "SendableMessage"],
reply_to: Optional[str],
headers: Optional["AnyDict"],
correlation_id: str,
) -> "RawMessage":
payload, content_type = encode_message(message)
headers_to_send = {
"correlation_id": correlation_id,
}
if content_type:
headers_to_send["content-type"] = content_type
if reply_to:
headers_to_send["reply_to"] = reply_to
if headers is not None:
headers_to_send.update(headers)
return cls(
data=payload,
headers=headers_to_send,
)
|
encode classmethod
encode(*, message, reply_to, headers, correlation_id)
Source code in faststream/redis/parser.py
| @classmethod
def encode(
cls,
*,
message: Union[Sequence["SendableMessage"], "SendableMessage"],
reply_to: Optional[str],
headers: Optional["AnyDict"],
correlation_id: str,
) -> bytes:
msg = cls.build(
message=message,
reply_to=reply_to,
headers=headers,
correlation_id=correlation_id,
)
return dump_json(
{
"data": msg.data,
"headers": msg.headers,
}
)
|
parse staticmethod
Source code in faststream/redis/parser.py
| @staticmethod
def parse(data: bytes) -> Tuple[bytes, "AnyDict"]:
headers: AnyDict
try:
# FastStream message format
parsed_data = json_loads(data)
data = parsed_data["data"].encode()
headers = parsed_data["headers"]
except Exception:
# Raw Redis message format
data = data
headers = {}
return data, headers
|