asyncdefnack(self)->None:"""Reject the Kafka message."""ifnotself.committed:raw_message=(self.raw_message[0]ifisinstance(self.raw_message,tuple)elseself.raw_message)topic_partition=AIOKafkaTopicPartition(raw_message.topic,raw_message.partition,)self.consumer.seek(partition=topic_partition,offset=raw_message.offset,)awaitsuper().nack()
asyncdefdecode(self)->Optional["DecodedMessage"]:"""Serialize the message by lazy decoder."""# TODO: make it lazy after `decoded_body` removedreturnself._decoded_body