NatsMessage(raw_message: Msg, body: Union[bytes, Any], decoded_body: Optional[DecodedMessage] = None, headers: AnyDict = dict(), path: AnyDict = dict(), content_type: Optional[str] = None, reply_to: str = '', message_id: str = lambda: str(uuid4())(), correlation_id: str = lambda: str(uuid4())(), is_js: bool = True)
Bases: StreamMessage[Msg]
A class to represent a NATS message.
committed class-attribute
instance-attribute
committed: bool = field(default=False, init=False)
content_type class-attribute
instance-attribute
correlation_id class-attribute
instance-attribute
decoded_body class-attribute
instance-attribute
decoded_body: Optional[DecodedMessage] = None
headers: AnyDict = field(default_factory=dict)
is_js class-attribute
instance-attribute
message_id class-attribute
instance-attribute
path class-attribute
instance-attribute
processed class-attribute
instance-attribute
processed: bool = field(default=False, init=False)
raw_message instance-attribute
reply_to class-attribute
instance-attribute
ack async
ack(**kwargs: Any) -> None
Source code in faststream/nats/message.py
| async def ack(self, **kwargs: Any) -> None:
await super().ack()
if self.is_js:
if not isinstance(self.raw_message, list):
if not self.raw_message._ackd:
await self.raw_message.ack()
else:
for m in filter(
lambda m: not m._ackd,
self.raw_message,
):
await m.ack()
|
in_progress async
in_progress(**kwargs: Any) -> None
Source code in faststream/nats/message.py
| async def in_progress(self, **kwargs: Any) -> None:
if self.is_js:
if not isinstance(self.raw_message, list):
if not self.raw_message._ackd:
await self.raw_message.in_progress()
else:
for m in filter(
lambda m: not m._ackd,
self.raw_message,
):
await m.in_progress()
|
nack async
nack(**kwargs: Any) -> None
Source code in faststream/nats/message.py
| async def nack(self, **kwargs: Any) -> None:
await super().nack()
if self.is_js:
if not isinstance(self.raw_message, list):
if not self.raw_message._ackd:
await self.raw_message.nak(**kwargs)
else:
for m in filter(
lambda m: not m._ackd,
self.raw_message,
):
await m.nak(**kwargs)
|
reject async
reject(**kwargs: Any) -> None
Source code in faststream/nats/message.py
| async def reject(self, **kwargs: Any) -> None:
await super().reject()
if self.is_js:
if not isinstance(self.raw_message, list):
if not self.raw_message._ackd:
await self.raw_message.term()
else:
for m in filter(
lambda m: not m._ackd,
self.raw_message,
):
await m.term()
|