Skip to content

KafkaMessage

faststream.confluent.message.KafkaMessage #

KafkaMessage(*args, consumer, is_manual=False, **kwargs)

Bases: StreamMessage[Union['Message', Tuple['Message', ...]]]

Represents a Kafka message in the FastStream framework.

This class extends StreamMessage and is specialized for handling confluent_kafka.Message objects.

Source code in faststream/confluent/message.py
def __init__(
    self,
    *args: Any,
    consumer: ConsumerProtocol,
    is_manual: bool = False,
    **kwargs: Any,
) -> None:
    super().__init__(*args, **kwargs)

    self.is_manual = is_manual
    self.consumer = consumer

raw_message instance-attribute #

raw_message

body instance-attribute #

body

headers class-attribute instance-attribute #

headers = field(default_factory=dict)

batch_headers class-attribute instance-attribute #

batch_headers = field(default_factory=list)

path class-attribute instance-attribute #

path = field(default_factory=dict)

content_type class-attribute instance-attribute #

content_type = None

reply_to class-attribute instance-attribute #

reply_to = ''

message_id class-attribute instance-attribute #

message_id = field(default_factory=gen_cor_id)

correlation_id class-attribute instance-attribute #

correlation_id = field(default_factory=gen_cor_id)

processed class-attribute instance-attribute #

processed = field(default=False, init=False)

committed class-attribute instance-attribute #

committed = field(default=None, init=False)

decoded_body property writable #

decoded_body

is_manual instance-attribute #

is_manual = is_manual

consumer instance-attribute #

consumer = consumer

reject async #

reject()
Source code in faststream/broker/message.py
async def reject(self) -> None:
    if not self.committed:
        self.committed = AckStatus.rejected

decode async #

decode()

Serialize the message by lazy decoder.

Source code in faststream/broker/message.py
async def decode(self) -> Optional["DecodedMessage"]:
    """Serialize the message by lazy decoder."""
    # TODO: make it lazy after `decoded_body` removed
    return self._decoded_body

ack async #

ack()

Acknowledge the Kafka message.

Source code in faststream/confluent/message.py
async def ack(self) -> None:
    """Acknowledge the Kafka message."""
    if self.is_manual and not self.committed:
        await self.consumer.commit()
    await super().ack()

nack async #

nack()

Reject the Kafka message.

Source code in faststream/confluent/message.py
async def nack(self) -> None:
    """Reject the Kafka message."""
    if self.is_manual and not self.committed:
        raw_message = (
            self.raw_message[0]
            if isinstance(self.raw_message, tuple)
            else self.raw_message
        )
        await self.consumer.seek(
            topic=raw_message.topic(),
            partition=raw_message.partition(),
            offset=raw_message.offset(),
        )
    await super().nack()