Skip to content

RabbitMessage

faststream.rabbit.message.RabbitMessage dataclass #

RabbitMessage(raw_message: Msg, body: Union[bytes, Any], decoded_body: Optional[DecodedMessage] = None, headers: AnyDict = dict(), path: AnyDict = dict(), content_type: Optional[str] = None, reply_to: str = '', message_id: str = lambda: str(uuid4())(), correlation_id: str = lambda: str(uuid4())())

Bases: StreamMessage[IncomingMessage]

A message class for working with RabbitMQ messages.

This class extends StreamMessage to provide additional functionality for acknowledging, rejecting, or nack-ing RabbitMQ messages.

METHOD DESCRIPTION
ack

Acknowledge the RabbitMQ message.

nack

Negative Acknowledgment of the RabbitMQ message.

reject

Reject the RabbitMQ message.

body instance-attribute #

body: Union[bytes, Any]

committed class-attribute instance-attribute #

committed: bool = field(default=False, init=False)

content_type class-attribute instance-attribute #

content_type: Optional[str] = None

correlation_id class-attribute instance-attribute #

correlation_id: str = field(default_factory=lambda: str(uuid4()))

decoded_body class-attribute instance-attribute #

decoded_body: Optional[DecodedMessage] = None

headers class-attribute instance-attribute #

headers: AnyDict = field(default_factory=dict)

message_id class-attribute instance-attribute #

message_id: str = field(default_factory=lambda: str(uuid4()))

path class-attribute instance-attribute #

path: AnyDict = field(default_factory=dict)

processed class-attribute instance-attribute #

processed: bool = field(default=False, init=False)

raw_message instance-attribute #

raw_message: Msg

reply_to class-attribute instance-attribute #

reply_to: str = ''

ack async #

ack(**kwargs: Any) -> None

Acknowledge the RabbitMQ message.

Acknowledgment indicates that the message has been successfully processed.

PARAMETER DESCRIPTION
**kwargs

Additional keyword arguments (not used).

TYPE: Any DEFAULT: {}

Source code in faststream/rabbit/message.py
async def ack(self, **kwargs: Any) -> None:
    """Acknowledge the RabbitMQ message.

    Acknowledgment indicates that the message has been successfully processed.

    Args:
        **kwargs (Any): Additional keyword arguments (not used).

    """
    pika_message = self.raw_message
    await super().ack()
    if (
        pika_message._IncomingMessage__processed  # type: ignore[attr-defined]
        or pika_message._IncomingMessage__no_ack  # type: ignore[attr-defined]
    ):
        return
    await pika_message.ack()

nack async #

nack(**kwargs: Any) -> None

Negative Acknowledgment of the RabbitMQ message.

Nack-ing a message indicates that the message processing has failed and should be requeued.

PARAMETER DESCRIPTION
**kwargs

Additional keyword arguments (not used).

TYPE: Any DEFAULT: {}

Source code in faststream/rabbit/message.py
async def nack(self, **kwargs: Any) -> None:
    """Negative Acknowledgment of the RabbitMQ message.

    Nack-ing a message indicates that the message processing has failed and should be requeued.

    Args:
        **kwargs (Any): Additional keyword arguments (not used).

    """
    pika_message = self.raw_message
    await super().nack()
    if (
        pika_message._IncomingMessage__processed  # type: ignore[attr-defined]
        or pika_message._IncomingMessage__no_ack  # type: ignore[attr-defined]
    ):
        return
    await pika_message.nack()

reject async #

reject(**kwargs: Any) -> None

Reject the RabbitMQ message.

Rejecting a message indicates that the message processing has failed, and it should not be requeued.

PARAMETER DESCRIPTION
**kwargs

Additional keyword arguments (not used).

TYPE: Any DEFAULT: {}

Source code in faststream/rabbit/message.py
async def reject(self, **kwargs: Any) -> None:
    """Reject the RabbitMQ message.

    Rejecting a message indicates that the message processing has failed, and it should not be requeued.

    Args:
        **kwargs (Any): Additional keyword arguments (not used).

    """
    pika_message = self.raw_message
    await super().reject()
    if (
        pika_message._IncomingMessage__processed  # type: ignore[attr-defined]
        or pika_message._IncomingMessage__no_ack  # type: ignore[attr-defined]
    ):
        return
    await pika_message.reject()