A class to parse NATS messages.
Source code in faststream/nats/parser.py
| def __init__(
self,
*,
pattern: str,
) -> None:
path_re, _ = compile_nats_wildcard(pattern)
self.__path_re = path_re
|
get_path
Source code in faststream/nats/parser.py
| def get_path(
self,
subject: str,
) -> Optional["AnyDict"]:
path: Optional[AnyDict] = None
if (path_re := self.__path_re) is not None and (
match := path_re.match(subject)
) is not None:
path = match.groupdict()
return path
|
decode_message async
Source code in faststream/nats/parser.py
| async def decode_message(
self,
msg: "StreamMessage[Any]",
) -> "DecodedMessage":
return decode_message(msg)
|