KafkaMessage ( * args , consumer , is_manual = False , ** kwargs )
Bases: StreamMessage [Union ['Message', Tuple ['Message', ...]]]
Represents a Kafka message in the FastStream framework.
This class extends StreamMessage
and is specialized for handling confluent_kafka.Message objects.
Source code in faststream/confluent/message.py
def __init__ (
self ,
* args : Any ,
consumer : ConsumerProtocol ,
is_manual : bool = False ,
** kwargs : Any ,
) -> None :
super () . __init__ ( * args , ** kwargs )
self . is_manual = is_manual
self . consumer = consumer
raw_message instance-attribute
path class-attribute
instance-attribute
content_type class-attribute
instance-attribute
reply_to class-attribute
instance-attribute
message_id class-attribute
instance-attribute
correlation_id class-attribute
instance-attribute
processed class-attribute
instance-attribute
processed = field ( default = False , init = False )
committed class-attribute
instance-attribute
committed = field ( default = None , init = False )
decoded_body property
writable
is_manual instance-attribute
consumer instance-attribute
reject async
Source code in faststream/broker/message.py
async def reject ( self ) -> None :
if not self . committed :
self . committed = AckStatus . rejected
decode async
Serialize the message by lazy decoder.
Source code in faststream/broker/message.py
async def decode ( self ) -> Optional [ "DecodedMessage" ]:
"""Serialize the message by lazy decoder."""
# TODO: make it lazy after `decoded_body` removed
return self . _decoded_body
ack async
Acknowledge the Kafka message.
Source code in faststream/confluent/message.py
async def ack ( self ) -> None :
"""Acknowledge the Kafka message."""
if self . is_manual and not self . committed :
await self . consumer . commit ()
await super () . ack ()
nack async
Reject the Kafka message.
Source code in faststream/confluent/message.py
async def nack ( self ) -> None :
"""Reject the Kafka message."""
if self . is_manual and not self . committed :
raw_message = (
self . raw_message [ 0 ]
if isinstance ( self . raw_message , tuple )
else self . raw_message
)
await self . consumer . seek (
topic = raw_message . topic (),
partition = raw_message . partition (),
offset = raw_message . offset (),
)
await super () . nack ()