Skip to content

NatsParser

faststream.nats.parser.NatsParser #

NatsParser(is_js: bool)
Source code in faststream/nats/parser.py
def __init__(self, is_js: bool):
    self.is_js = is_js

is_js instance-attribute #

is_js = is_js

decode_message async staticmethod #

decode_message(msg: StreamMessage[Msg]) -> DecodedMessage
Source code in faststream/nats/parser.py
@staticmethod
async def decode_message(
    msg: StreamMessage[Msg],
) -> DecodedMessage:
    return decode_message(msg)

parse_message async #

parse_message(message: Msg) -> StreamMessage[Msg]
Source code in faststream/nats/parser.py
async def parse_message(
    self,
    message: Msg,
) -> StreamMessage[Msg]:
    headers = message.header or {}

    handler = context.get("handler_")
    path: AnyDict = {}
    path_re: Optional[Pattern[str]]
    if (  # pragma: no branch
        handler
        and (path_re := handler.path_regex) is not None
        and (match := path_re.match(message.subject)) is not None
    ):
        path = match.groupdict()

    return NatsMessage(
        is_js=self.is_js,
        raw_message=message,
        body=message.data,
        path=path,
        reply_to=headers.get("reply_to", "") if self.is_js else message.reply,
        headers=headers,
        content_type=headers.get("content-type", ""),
        message_id=headers.get("message_id", str(uuid4())),
        correlation_id=headers.get("correlation_id", str(uuid4())),
    )

Last update: 2023-11-13