Skip to content

NatsBaseParser

faststream.nats.parser.NatsBaseParser #

NatsBaseParser(*, pattern)

A class to parse NATS messages.

Source code in faststream/nats/parser.py
def __init__(
    self,
    *,
    pattern: str,
) -> None:
    path_re, _ = compile_nats_wildcard(pattern)
    self.__path_re = path_re

get_path #

get_path(subject)
Source code in faststream/nats/parser.py
def get_path(
    self,
    subject: str,
) -> Optional["AnyDict"]:
    path: Optional[AnyDict] = None

    if (path_re := self.__path_re) is not None and (
        match := path_re.match(subject)
    ) is not None:
        path = match.groupdict()

    return path

decode_message async #

decode_message(msg)
Source code in faststream/nats/parser.py
async def decode_message(
    self,
    msg: "StreamMessage[Any]",
) -> "DecodedMessage":
    return decode_message(msg)