Skip to content

RabbitMessage

faststream.rabbit.message.RabbitMessage dataclass #

RabbitMessage(raw_message, body, headers=dict(), batch_headers=list(), path=dict(), content_type=None, reply_to='', message_id=gen_cor_id(), correlation_id=gen_cor_id(), _source_type=Consume)

Bases: StreamMessage[IncomingMessage]

A message class for working with RabbitMQ messages.

This class extends StreamMessage to provide additional functionality for acknowledging, rejecting, or nack-ing RabbitMQ messages.

raw_message instance-attribute #

raw_message

body instance-attribute #

body

headers class-attribute instance-attribute #

headers = field(default_factory=dict)

batch_headers class-attribute instance-attribute #

batch_headers = field(default_factory=list)

path class-attribute instance-attribute #

path = field(default_factory=dict)

content_type class-attribute instance-attribute #

content_type = None

reply_to class-attribute instance-attribute #

reply_to = ''

message_id class-attribute instance-attribute #

message_id = field(default_factory=gen_cor_id)

correlation_id class-attribute instance-attribute #

correlation_id = field(default_factory=gen_cor_id)

processed class-attribute instance-attribute #

processed = field(default=False, init=False)

committed class-attribute instance-attribute #

committed = field(default=None, init=False)

decoded_body property writable #

decoded_body

decode async #

decode()

Serialize the message by lazy decoder.

Source code in faststream/broker/message.py
async def decode(self) -> Optional["DecodedMessage"]:
    """Serialize the message by lazy decoder."""
    # TODO: make it lazy after `decoded_body` removed
    return self._decoded_body

ack async #

ack(multiple=False)

Acknowledge the RabbitMQ message.

Source code in faststream/rabbit/message.py
async def ack(
    self,
    multiple: bool = False,
) -> None:
    """Acknowledge the RabbitMQ message."""
    pika_message = self.raw_message
    await super().ack()
    if pika_message.locked:
        return
    await pika_message.ack(multiple=multiple)

nack async #

nack(multiple=False, requeue=True)

Negative Acknowledgment of the RabbitMQ message.

Source code in faststream/rabbit/message.py
async def nack(
    self,
    multiple: bool = False,
    requeue: bool = True,
) -> None:
    """Negative Acknowledgment of the RabbitMQ message."""
    pika_message = self.raw_message
    await super().nack()
    if pika_message.locked:
        return
    await pika_message.nack(multiple=multiple, requeue=requeue)

reject async #

reject(requeue=False)

Reject the RabbitMQ message.

Source code in faststream/rabbit/message.py
async def reject(
    self,
    requeue: bool = False,
) -> None:
    """Reject the RabbitMQ message."""
    pika_message = self.raw_message
    await super().reject()
    if pika_message.locked:
        return
    await pika_message.reject(requeue=requeue)