Skip to content

AsyncConfluentParser

faststream.confluent.parser.AsyncConfluentParser #

A class to parse Kafka messages.

decode_message async staticmethod #

decode_message(msg: StreamMessage[Message]) -> DecodedMessage

Decodes a message.

PARAMETER DESCRIPTION
msg

The message to be decoded.

TYPE: StreamMessage[Message]

RETURNS DESCRIPTION
DecodedMessage

The decoded message.

Source code in faststream/confluent/parser.py
@staticmethod
async def decode_message(msg: StreamMessage[Message]) -> DecodedMessage:
    """Decodes a message.

    Args:
        msg: The message to be decoded.

    Returns:
        The decoded message.

    """
    return decode_message(msg)

decode_message_batch async classmethod #

decode_message_batch(msg: StreamMessage[Tuple[Message, ...]]) -> List[DecodedMessage]

Decode a batch of messages.

PARAMETER DESCRIPTION
msg

A stream message containing a tuple of consumer records.

TYPE: StreamMessage[Tuple[Message, ...]]

RETURNS DESCRIPTION
List[DecodedMessage]

A list of decoded messages.

Source code in faststream/confluent/parser.py
@classmethod
async def decode_message_batch(
    cls, msg: StreamMessage[Tuple[Message, ...]]
) -> List[DecodedMessage]:
    """Decode a batch of messages.

    Args:
        msg: A stream message containing a tuple of consumer records.

    Returns:
        A list of decoded messages.

    """
    return [decode_message(await cls.parse_message(m)) for m in msg.raw_message]

parse_message async staticmethod #

parse_message(message: Message) -> StreamMessage[Message]

Parses a Kafka message.

PARAMETER DESCRIPTION
message

The Kafka message to parse.

TYPE: Message

RETURNS DESCRIPTION
StreamMessage[Message]

A StreamMessage object representing the parsed message.

Source code in faststream/confluent/parser.py
@staticmethod
async def parse_message(
    message: Message,
) -> StreamMessage[Message]:
    """Parses a Kafka message.

    Args:
        message: The Kafka message to parse.

    Returns:
        A StreamMessage object representing the parsed message.

    """
    headers = {}
    if message.headers() is not None:
        for i, j in message.headers():  # type: ignore[union-attr]
            if isinstance(j, str):
                headers[i] = j
            else:
                headers[i] = j.decode()
    body = message.value()
    offset = message.offset()
    _, timestamp = message.timestamp()

    handler: Optional["Handler"] = context.get_local("handler_")
    return KafkaMessage(
        body=body,
        headers=headers,
        reply_to=headers.get("reply_to", ""),
        content_type=headers.get("content-type"),
        message_id=f"{offset}-{timestamp}",
        correlation_id=headers.get("correlation_id", str(uuid4())),
        raw_message=message,
        consumer=getattr(handler, "consumer", None) or FAKE_CONSUMER,
        is_manual=getattr(handler, "is_manual", True),
    )

parse_message_batch async staticmethod #

parse_message_batch(message: Tuple[Message, ...]) -> KafkaMessage

Parses a batch of messages from a Kafka consumer.

PARAMETER DESCRIPTION
message

A tuple of ConsumerRecord or Message objects representing the messages to parse.

RETURNS DESCRIPTION
KafkaMessage

A StreamMessage object containing the parsed messages.

RAISES DESCRIPTION
NotImplementedError

If any of the messages are silent (i.e., have no sound).

Static Method

This method is a static method. It does not require an instance of the class to be called.

Source code in faststream/confluent/parser.py
@staticmethod
async def parse_message_batch(
    message: Tuple[Message, ...],
) -> KafkaMessage:
    """Parses a batch of messages from a Kafka consumer.

    Args:
        message : A tuple of ConsumerRecord or Message objects representing the messages to parse.

    Returns:
        A StreamMessage object containing the parsed messages.

    Raises:
        NotImplementedError: If any of the messages are silent (i.e., have no sound).

    Static Method:
        This method is a static method. It does not require an instance of the class to be called.

    """
    first = message[0]
    last = message[-1]

    headers = {}
    if first.headers() is not None:
        for i, j in first.headers():  # type: ignore[union-attr]
            if isinstance(j, str):
                headers[i] = j
            else:
                headers[i] = j.decode()
    body = [m.value() for m in message]
    first_offset = first.offset()
    last_offset = last.offset()
    _, first_timestamp = first.timestamp()

    handler: Optional["Handler"] = context.get_local("handler_")
    return KafkaMessage(
        body=body,
        headers=headers,
        reply_to=headers.get("reply_to", ""),
        content_type=headers.get("content-type"),
        message_id=f"{first_offset}-{last_offset}-{first_timestamp}",
        correlation_id=headers.get("correlation_id", str(uuid4())),
        raw_message=message,
        consumer=getattr(handler, "consumer", None) or FAKE_CONSUMER,
        is_manual=getattr(handler, "is_manual", True),
    )