Skip to content

BatchParser

faststream.nats.parser.BatchParser #

BatchParser(*, pattern)

Bases: JsParser

A class to parse NATS batch messages.

Source code in faststream/nats/parser.py
def __init__(
    self,
    *,
    pattern: str,
) -> None:
    path_re, _ = compile_nats_wildcard(pattern)
    self.__path_re = path_re

get_path #

get_path(subject)
Source code in faststream/nats/parser.py
def get_path(
    self,
    subject: str,
) -> Optional["AnyDict"]:
    path: Optional[AnyDict] = None

    if (path_re := self.__path_re) is not None and (
        match := path_re.match(subject)
    ) is not None:
        path = match.groupdict()

    return path

decode_message async #

decode_message(msg)
Source code in faststream/nats/parser.py
async def decode_message(
    self,
    msg: "StreamMessage[Any]",
) -> "DecodedMessage":
    return decode_message(msg)

parse_message async #

parse_message(message, *, path=None)
Source code in faststream/nats/parser.py
async def parse_message(
    self,
    message: "Msg",
    *,
    path: Optional["AnyDict"] = None,
) -> "StreamMessage[Msg]":
    if path is None:
        path = self.get_path(message.subject)

    headers = message.header or {}

    return NatsMessage(
        raw_message=message,
        body=message.data,
        path=path or {},
        reply_to=headers.get("reply_to", ""),  # differ from core
        headers=headers,
        content_type=headers.get("content-type", ""),
        message_id=headers.get("message_id", gen_cor_id()),
        correlation_id=headers.get("correlation_id", gen_cor_id()),
    )

parse_batch async #

parse_batch(message)
Source code in faststream/nats/parser.py
async def parse_batch(
    self,
    message: List["Msg"],
) -> "StreamMessage[List[Msg]]":
    body: List[bytes] = []
    batch_headers: List[Dict[str, str]] = []

    if message:
        path = self.get_path(message[0].subject)

        for m in message:
            batch_headers.append(m.headers or {})
            body.append(m.data)

    else:
        path = None

    headers = next(iter(batch_headers), {})

    return NatsBatchMessage(
        raw_message=message,
        body=body,
        path=path or {},
        headers=headers,
        batch_headers=batch_headers,
    )

decode_batch async #

decode_batch(msg)
Source code in faststream/nats/parser.py
async def decode_batch(
    self,
    msg: "StreamMessage[List[Msg]]",
) -> List["DecodedMessage"]:
    data: List[DecodedMessage] = []

    path: Optional[AnyDict] = None
    for m in msg.raw_message:
        one_msg = await self.parse_message(m, path=path)
        path = one_msg.path

        data.append(decode_message(one_msg))

    return data