Bases: StreamMessage[List[Msg]]
A class to represent a NATS batch message.
raw_message instance-attribute
path class-attribute
instance-attribute
content_type class-attribute
instance-attribute
reply_to class-attribute
instance-attribute
message_id class-attribute
instance-attribute
correlation_id class-attribute
instance-attribute
processed class-attribute
instance-attribute
processed = field(default=False, init=False)
committed class-attribute
instance-attribute
committed = field(default=None, init=False)
decoded_body property
writable
decode async
Serialize the message by lazy decoder.
Source code in faststream/broker/message.py
| async def decode(self) -> Optional["DecodedMessage"]:
"""Serialize the message by lazy decoder."""
# TODO: make it lazy after `decoded_body` removed
return self._decoded_body
|
ack async
Source code in faststream/nats/message.py
| async def ack(self) -> None:
for m in filter(
lambda m: not m._ackd,
self.raw_message,
):
await m.ack()
await super().ack()
|
nack async
Source code in faststream/nats/message.py
| async def nack(
self,
delay: Union[int, float, None] = None,
) -> None:
for m in filter(
lambda m: not m._ackd,
self.raw_message,
):
await m.nak(delay=delay)
await super().nack()
|
reject async
Source code in faststream/nats/message.py
| async def reject(self) -> None:
for m in filter(
lambda m: not m._ackd,
self.raw_message,
):
await m.term()
await super().reject()
|
in_progress async
Source code in faststream/nats/message.py
| async def in_progress(self) -> None:
for m in filter(
lambda m: not m._ackd,
self.raw_message,
):
await m.in_progress()
|