Bases: StreamMessage [ConsumerRecord ]
Represents a Kafka message in the FastStream framework.
This class extends StreamMessage
and is specialized for handling Kafka ConsumerRecord objects.
Initialize the KafkaMessage object.
Source code in faststream/kafka/message.py
def __init__ (
self ,
* args : Any ,
consumer : ConsumerProtocol ,
is_manual : bool = False ,
** kwargs : Any ,
) -> None :
"""Initialize the KafkaMessage object.
Args:
*args (Any): Additional positional arguments.
consumer (aiokafka.AIOKafkaConsumer): The Kafka consumer.
is_manual (bool): Whether the message is manually acknowledged.
**kwargs (Any): Additional keyword arguments.
"""
super () . __init__ ( * args , ** kwargs )
self . is_manual = is_manual
self . consumer = consumer
committed class-attribute
instance-attribute
committed : bool = field ( default = False , init = False )
consumer instance-attribute
content_type class-attribute
instance-attribute
correlation_id class-attribute
instance-attribute
decoded_body class-attribute
instance-attribute
decoded_body : Optional [ DecodedMessage ] = None
headers : AnyDict = field ( default_factory = dict )
is_manual instance-attribute
message_id class-attribute
instance-attribute
path class-attribute
instance-attribute
processed class-attribute
instance-attribute
processed : bool = field ( default = False , init = False )
raw_message instance-attribute
reply_to class-attribute
instance-attribute
ack async
ack ( ** kwargs : Any ) -> None
Acknowledge the Kafka message.
Source code in faststream/kafka/message.py
async def ack ( self , ** kwargs : Any ) -> None :
"""Acknowledge the Kafka message.
Args:
**kwargs (Any): Additional keyword arguments.
Returns:
None: This method does not return a value.
"""
if self . is_manual and not self . committed :
await self . consumer . commit ()
await super () . ack ()
nack async
nack ( ** kwargs : Any ) -> None
Source code in faststream/broker/message.py
async def nack ( self , ** kwargs : Any ) -> None :
self . committed = True
reject async
reject ( ** kwargs : Any ) -> None
Source code in faststream/broker/message.py
async def reject ( self , ** kwargs : Any ) -> None :
self . committed = True