Skip to content

NatsMessage

faststream.nats.message.NatsMessage dataclass #

Bases: StreamMessage[Msg]

A class to represent a NATS message.

body instance-attribute #

body: Union[bytes, Any]

committed class-attribute instance-attribute #

committed: bool = field(default=False, init=False)

content_type class-attribute instance-attribute #

content_type: Optional[str] = None

correlation_id class-attribute instance-attribute #

correlation_id: str = field(
    default_factory=lambda: str(uuid4())
)

decoded_body class-attribute instance-attribute #

decoded_body: Optional[DecodedMessage] = None

headers class-attribute instance-attribute #

headers: AnyDict = field(default_factory=dict)

is_js class-attribute instance-attribute #

is_js: bool = True

message_id class-attribute instance-attribute #

message_id: str = field(
    default_factory=lambda: str(uuid4())
)

path class-attribute instance-attribute #

path: AnyDict = field(default_factory=dict)

processed class-attribute instance-attribute #

processed: bool = field(default=False, init=False)

raw_message instance-attribute #

raw_message: Msg

reply_to class-attribute instance-attribute #

reply_to: str = ''

ack async #

ack(**kwargs: Any) -> None
Source code in faststream/nats/message.py
async def ack(self, **kwargs: Any) -> None:
    await super().ack()
    if self.is_js:
        if not isinstance(self.raw_message, list):
            if not self.raw_message._ackd:
                await self.raw_message.ack()

        else:
            for m in filter(
                lambda m: not m._ackd,
                self.raw_message,
            ):
                await m.ack()

in_progress async #

in_progress(**kwargs: Any) -> None
Source code in faststream/nats/message.py
async def in_progress(self, **kwargs: Any) -> None:
    if self.is_js:
        if not isinstance(self.raw_message, list):
            if not self.raw_message._ackd:
                await self.raw_message.in_progress()

        else:
            for m in filter(
                lambda m: not m._ackd,
                self.raw_message,
            ):
                await m.in_progress()

nack async #

nack(**kwargs: Any) -> None
Source code in faststream/nats/message.py
async def nack(self, **kwargs: Any) -> None:
    await super().nack()
    if self.is_js:
        if not isinstance(self.raw_message, list):
            if not self.raw_message._ackd:
                await self.raw_message.nak(**kwargs)

        else:
            for m in filter(
                lambda m: not m._ackd,
                self.raw_message,
            ):
                await m.nak(**kwargs)

reject async #

reject(**kwargs: Any) -> None
Source code in faststream/nats/message.py
async def reject(self, **kwargs: Any) -> None:
    await super().reject()
    if self.is_js:
        if not isinstance(self.raw_message, list):
            if not self.raw_message._ackd:
                await self.raw_message.term()

        else:
            for m in filter(
                lambda m: not m._ackd,
                self.raw_message,
            ):
                await m.term()