Skip to content

NatsParser

faststream.nats.parser.NatsParser #

NatsParser(is_js: bool)

A class to parse NATS messages.

Initialize the NATS parser.

PARAMETER DESCRIPTION
is_js

Whether the parser is for JetStream.

TYPE: bool

Source code in faststream/nats/parser.py
def __init__(self, is_js: bool) -> None:
    """Initialize the NATS parser.

    Args:
        is_js: Whether the parser is for JetStream.
    """
    self.is_js = is_js

is_js instance-attribute #

is_js = is_js

decode_message async #

decode_message(msg: Union[StreamMessage[Msg], StreamMessage[List[Msg]]]) -> Union[List[DecodedMessage], DecodedMessage]
Source code in faststream/nats/parser.py
async def decode_message(
    self,
    msg: Union[
        StreamMessage[Msg],
        StreamMessage[List[Msg]],
    ],
) -> Union[List[DecodedMessage], DecodedMessage]:
    if isinstance(msg.raw_message, list):
        data: List[DecodedMessage] = []

        path: Optional[AnyDict] = None
        for m in msg.raw_message:
            msg = await self.parse_message(m, path=path)
            path = msg.path

            data.append(decode_message(msg))

        return data

    else:
        return decode_message(msg)

parse_message async #

parse_message(message: Union[Msg, List[Msg]], *, path: Optional[AnyDict] = None) -> Union[StreamMessage[Msg], StreamMessage[List[Msg]]]
Source code in faststream/nats/parser.py
async def parse_message(
    self, message: Union[Msg, List[Msg]], *, path: Optional[AnyDict] = None
) -> Union[
    StreamMessage[Msg],
    StreamMessage[List[Msg]],
]:
    if isinstance(message, list):
        return NatsMessage(
            is_js=self.is_js,
            raw_message=message,  # type: ignore[arg-type]
            body=[m.data for m in message],
        )

    else:
        handler: Optional["Handler"]
        if (
            path is None
            and (handler := context.get_local("handler_")) is not None
            and (path_re := handler.path_regex) is not None
            and (match := path_re.match(message.subject)) is not None
        ):
            path = match.groupdict()

        headers = message.header or {}

        return NatsMessage(
            is_js=self.is_js,
            raw_message=message,
            body=message.data,
            path=path or {},
            reply_to=headers.get("reply_to", "") if self.is_js else message.reply,
            headers=headers,
            content_type=headers.get("content-type", ""),
            message_id=headers.get("message_id", str(uuid4())),
            correlation_id=headers.get("correlation_id", str(uuid4())),
        )