Bases: StreamMessage[Union['ConsumerRecord', Tuple['ConsumerRecord', ...]]]
Represents a Kafka message in the FastStream framework.
This class extends StreamMessage
and is specialized for handling Kafka ConsumerRecord objects.
Source code in faststream/kafka/message.py
| def __init__(
self,
*args: Any,
consumer: ConsumerProtocol,
**kwargs: Any,
) -> None:
super().__init__(*args, **kwargs)
self.consumer = consumer
|
raw_message instance-attribute
path class-attribute
instance-attribute
content_type class-attribute
instance-attribute
reply_to class-attribute
instance-attribute
message_id class-attribute
instance-attribute
correlation_id class-attribute
instance-attribute
processed class-attribute
instance-attribute
processed = field(default=False, init=False)
committed class-attribute
instance-attribute
committed = field(default=None, init=False)
decoded_body property
writable
consumer instance-attribute
ack async
Source code in faststream/broker/message.py
| async def ack(self) -> None:
if not self.committed:
self.committed = AckStatus.acked
|
reject async
Source code in faststream/broker/message.py
| async def reject(self) -> None:
if not self.committed:
self.committed = AckStatus.rejected
|
decode async
Serialize the message by lazy decoder.
Source code in faststream/broker/message.py
| async def decode(self) -> Optional["DecodedMessage"]:
"""Serialize the message by lazy decoder."""
# TODO: make it lazy after `decoded_body` removed
return self._decoded_body
|
nack async
Reject the Kafka message.
Source code in faststream/kafka/message.py
| async def nack(self) -> None:
"""Reject the Kafka message."""
if not self.committed:
raw_message = (
self.raw_message[0]
if isinstance(self.raw_message, tuple)
else self.raw_message
)
topic_partition = AIOKafkaTopicPartition(
raw_message.topic,
raw_message.partition,
)
self.consumer.seek(
partition=topic_partition,
offset=raw_message.offset,
)
await super().nack()
|