Bases: JsParser
A class to parse NATS batch messages.
Source code in faststream/nats/parser.py
| def __init__(
self,
*,
pattern: str,
) -> None:
path_re, _ = compile_nats_wildcard(pattern)
self.__path_re = path_re
|
get_path
Source code in faststream/nats/parser.py
| def get_path(
self,
subject: str,
) -> Optional["AnyDict"]:
path: Optional[AnyDict] = None
if (path_re := self.__path_re) is not None and (
match := path_re.match(subject)
) is not None:
path = match.groupdict()
return path
|
decode_message async
Source code in faststream/nats/parser.py
| async def decode_message(
self,
msg: "StreamMessage[Any]",
) -> "DecodedMessage":
return decode_message(msg)
|
parse_message async
parse_message(message, *, path=None)
Source code in faststream/nats/parser.py
| async def parse_message(
self,
message: "Msg",
*,
path: Optional["AnyDict"] = None,
) -> "StreamMessage[Msg]":
if path is None:
path = self.get_path(message.subject)
headers = message.header or {}
return NatsMessage(
raw_message=message,
body=message.data,
path=path or {},
reply_to=headers.get("reply_to", ""), # differ from core
headers=headers,
content_type=headers.get("content-type", ""),
message_id=headers.get("message_id", gen_cor_id()),
correlation_id=headers.get("correlation_id", gen_cor_id()),
)
|
parse_batch async
Source code in faststream/nats/parser.py
| async def parse_batch(
self,
message: List["Msg"],
) -> "StreamMessage[List[Msg]]":
body: List[bytes] = []
batch_headers: List[Dict[str, str]] = []
if message:
path = self.get_path(message[0].subject)
for m in message:
batch_headers.append(m.headers or {})
body.append(m.data)
else:
path = None
headers = next(iter(batch_headers), {})
return NatsBatchMessage(
raw_message=message,
body=body,
path=path or {},
headers=headers,
batch_headers=batch_headers,
)
|
decode_batch async
Source code in faststream/nats/parser.py
| async def decode_batch(
self,
msg: "StreamMessage[List[Msg]]",
) -> List["DecodedMessage"]:
data: List[DecodedMessage] = []
path: Optional[AnyDict] = None
for m in msg.raw_message:
one_msg = await self.parse_message(m, path=path)
path = one_msg.path
data.append(decode_message(one_msg))
return data
|