Skip to content

KafkaMessage

faststream.kafka.message.KafkaMessage #

KafkaMessage(*args: Any, consumer: ConsumerProtocol, is_manual: bool = False, **kwargs: Any)

Bases: StreamMessage[ConsumerRecord]

Represents a Kafka message in the FastStream framework.

This class extends StreamMessage and is specialized for handling Kafka ConsumerRecord objects.

METHOD DESCRIPTION
ack

Acknowledge the Kafka message.

nack

Negative acknowledgment of the Kafka message.

reject

Reject the Kafka message.

Initialize the KafkaMessage object.

PARAMETER DESCRIPTION
*args

Additional positional arguments.

TYPE: Any DEFAULT: ()

consumer

The Kafka consumer.

TYPE: AIOKafkaConsumer

is_manual

Whether the message is manually acknowledged.

TYPE: bool DEFAULT: False

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

Source code in faststream/kafka/message.py
def __init__(
    self,
    *args: Any,
    consumer: ConsumerProtocol,
    is_manual: bool = False,
    **kwargs: Any,
) -> None:
    """Initialize the KafkaMessage object.

    Args:
        *args (Any): Additional positional arguments.
        consumer (aiokafka.AIOKafkaConsumer): The Kafka consumer.
        is_manual (bool): Whether the message is manually acknowledged.
        **kwargs (Any): Additional keyword arguments.
    """
    super().__init__(*args, **kwargs)

    self.is_manual = is_manual
    self.consumer = consumer

body instance-attribute #

body: Union[bytes, Any]

committed class-attribute instance-attribute #

committed: bool = field(default=False, init=False)

consumer instance-attribute #

consumer = consumer

content_type class-attribute instance-attribute #

content_type: Optional[str] = None

correlation_id class-attribute instance-attribute #

correlation_id: str = field(default_factory=lambda: str(uuid4()))

decoded_body class-attribute instance-attribute #

decoded_body: Optional[DecodedMessage] = None

headers class-attribute instance-attribute #

headers: AnyDict = field(default_factory=dict)

is_manual instance-attribute #

is_manual = is_manual

message_id class-attribute instance-attribute #

message_id: str = field(default_factory=lambda: str(uuid4()))

path class-attribute instance-attribute #

path: AnyDict = field(default_factory=dict)

processed class-attribute instance-attribute #

processed: bool = field(default=False, init=False)

raw_message instance-attribute #

raw_message: Msg

reply_to class-attribute instance-attribute #

reply_to: str = ''

ack async #

ack(**kwargs: Any) -> None

Acknowledge the Kafka message.

PARAMETER DESCRIPTION
**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
None

This method does not return a value.

TYPE: None

Source code in faststream/kafka/message.py
async def ack(self, **kwargs: Any) -> None:
    """Acknowledge the Kafka message.

    Args:
        **kwargs (Any): Additional keyword arguments.

    Returns:
        None: This method does not return a value.
    """
    if self.is_manual and not self.committed:
        await self.consumer.commit()
        await super().ack()

nack async #

nack(**kwargs: Any) -> None
Source code in faststream/broker/message.py
async def nack(self, **kwargs: Any) -> None:
    self.committed = True

reject async #

reject(**kwargs: Any) -> None
Source code in faststream/broker/message.py
async def reject(self, **kwargs: Any) -> None:
    self.committed = True