Skip to content

SimpleParser

faststream.redis.parser.SimpleParser #

SimpleParser(pattern=None)
Source code in faststream/redis/parser.py
def __init__(
    self,
    pattern: Optional["Pattern[str]"] = None,
) -> None:
    self.pattern = pattern

msg_class instance-attribute #

msg_class

pattern instance-attribute #

pattern = pattern

parse_message async #

parse_message(message)
Source code in faststream/redis/parser.py
async def parse_message(
    self,
    message: Mapping[str, Any],
) -> "StreamMessage[Mapping[str, Any]]":
    data, headers, batch_headers = self._parse_data(message)

    id_ = gen_cor_id()

    return self.msg_class(
        raw_message=message,
        body=data,
        path=self.get_path(message),
        headers=headers,
        batch_headers=batch_headers,
        reply_to=headers.get("reply_to", ""),
        content_type=headers.get("content-type"),
        message_id=headers.get("message_id", id_),
        correlation_id=headers.get("correlation_id", id_),
    )

get_path #

get_path(message)
Source code in faststream/redis/parser.py
def get_path(self, message: Mapping[str, Any]) -> "AnyDict":
    if (
        message.get("pattern")
        and (path_re := self.pattern)
        and (match := path_re.match(message["channel"]))
    ):
        return match.groupdict()

    else:
        return {}

decode_message async #

decode_message(msg)
Source code in faststream/redis/parser.py
async def decode_message(
    self,
    msg: "StreamMessage[MsgType]",
) -> DecodedMessage:
    return decode_message(msg)