Skip to content

RedisParser

faststream.redis.parser.RedisParser #

A class to represent a Redis parser.

decode_message async staticmethod #

decode_message(msg: OneRedisMessage) -> DecodedMessage
Source code in faststream/redis/parser.py
@staticmethod
async def decode_message(
    msg: OneRedisMessage,
) -> DecodedMessage:
    return decode_message(msg)

parse_message async classmethod #

Source code in faststream/redis/parser.py
@classmethod
async def parse_message(
    cls,
    message: Union[OneMessage, BatchMessage],
) -> Union[OneRedisMessage, BatchRedisMessage]:
    id_ = str(uuid4())

    if message["type"] == "batch":
        data = dump_json([cls.parse_one_msg(x)[0] for x in message["data"]])

        return BatchRedisMessage(
            raw_message=message,
            body=data,
            content_type="application/json",
            message_id=id_,
            correlation_id=id_,
        )

    else:
        data, headers = cls.parse_one_msg(message["data"])

        channel = message.get("channel", b"").decode()

        handler: Optional["Handler"] = context.get_local("handler_")
        if (
            handler is not None
            and handler.channel is not None
            and (path_re := handler.channel.path_regex) is not None
            and (match := path_re.match(channel)) is not None
        ):
            path = match.groupdict()
        else:
            path = {}

        return OneRedisMessage(
            raw_message=message,
            body=data,
            path=path,
            headers=headers,
            reply_to=headers.get("reply_to", ""),
            content_type=headers.get("content-type", ""),
            message_id=message.get("message_id", id_),
            correlation_id=headers.get("correlation_id", id_),
        )

parse_one_msg staticmethod #

parse_one_msg(raw_data: bytes) -> Tuple[bytes, AnyDict]
Source code in faststream/redis/parser.py
@staticmethod
def parse_one_msg(raw_data: bytes) -> Tuple[bytes, AnyDict]:
    try:
        obj = model_parse(RawMessage, raw_data)
    except Exception:
        # Raw Redis message format
        data = raw_data
        headers: AnyDict = {}
    else:
        # FastStream message format
        data = obj.data
        headers = obj.headers

    return data, headers