Skip to content

KafkaMessage

faststream.confluent.message.KafkaMessage #

KafkaMessage(*args: Any, consumer: ConsumerProtocol, is_manual: bool = False, **kwargs: Any)

Bases: StreamMessage[Message]

Represents a Kafka message in the FastStream framework.

This class extends StreamMessage and is specialized for handling Kafka ConsumerRecord objects.

METHOD DESCRIPTION
ack

Acknowledge the Kafka message.

nack

Negative acknowledgment of the Kafka message.

reject

Reject the Kafka message.

Constructor method for the KafkaMessage class.

PARAMETER DESCRIPTION
*args

Additional positional arguments.

TYPE: Any DEFAULT: ()

consumer

The Kafka consumer that received the message.

TYPE: AsyncConfluentConsumer

is_manual

Whether the consumer is manual or not.

TYPE: bool DEFAULT: False

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

Source code in faststream/confluent/message.py
def __init__(
    self,
    *args: Any,
    consumer: ConsumerProtocol,
    is_manual: bool = False,
    **kwargs: Any,
) -> None:
    """Constructor method for the KafkaMessage class.

    Args:
        *args (Any): Additional positional arguments.
        consumer (AsyncConfluentConsumer): The Kafka consumer that received the message.
        is_manual (bool): Whether the consumer is manual or not.
        **kwargs (Any): Additional keyword arguments.

    """
    super().__init__(*args, **kwargs)

    self.is_manual = is_manual
    self.consumer = consumer

body instance-attribute #

body: Union[bytes, Any]

committed class-attribute instance-attribute #

committed: bool = field(default=False, init=False)

consumer instance-attribute #

consumer = consumer

content_type class-attribute instance-attribute #

content_type: Optional[str] = None

correlation_id class-attribute instance-attribute #

correlation_id: str = field(default_factory=lambda: str(uuid4()))

decoded_body class-attribute instance-attribute #

decoded_body: Optional[DecodedMessage] = None

headers class-attribute instance-attribute #

headers: AnyDict = field(default_factory=dict)

is_manual instance-attribute #

is_manual = is_manual

message_id class-attribute instance-attribute #

message_id: str = field(default_factory=lambda: str(uuid4()))

path class-attribute instance-attribute #

path: AnyDict = field(default_factory=dict)

processed class-attribute instance-attribute #

processed: bool = field(default=False, init=False)

raw_message instance-attribute #

raw_message: Msg

reply_to class-attribute instance-attribute #

reply_to: str = ''

ack async #

ack(**kwargs: Any) -> None

Acknowledge the Kafka message.

PARAMETER DESCRIPTION
**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
None

This method does not return a value.

TYPE: None

Source code in faststream/confluent/message.py
async def ack(self, **kwargs: Any) -> None:
    """Acknowledge the Kafka message.

    Args:
        **kwargs (Any): Additional keyword arguments.

    Returns:
        None: This method does not return a value.
    """
    if self.is_manual and not self.committed:
        await self.consumer.commit()
        await super().ack()

nack async #

nack(**kwargs: Any) -> None
Source code in faststream/broker/message.py
async def nack(self, **kwargs: Any) -> None:
    self.committed = True

reject async #

reject(**kwargs: Any) -> None
Source code in faststream/broker/message.py
async def reject(self, **kwargs: Any) -> None:
    self.committed = True