Skip to content

AsyncConfluentParser

faststream.confluent.parser.AsyncConfluentParser #

A class to parse Kafka messages.

parse_message async staticmethod #

parse_message(message)

Parses a Kafka message.

Source code in faststream/confluent/parser.py
@staticmethod
async def parse_message(
    message: "Message",
) -> KafkaMessage:
    """Parses a Kafka message."""
    headers = _parse_msg_headers(message.headers() or ())

    body = message.value() or b""
    offset = message.offset()
    _, timestamp = message.timestamp()

    handler: Optional[LogicSubscriber[Any]] = context.get_local("handler_")

    return KafkaMessage(
        body=body,
        headers=headers,
        reply_to=headers.get("reply_to", ""),
        content_type=headers.get("content-type"),
        message_id=f"{offset}-{timestamp}",
        correlation_id=headers.get("correlation_id", gen_cor_id()),
        raw_message=message,
        consumer=getattr(handler, "consumer", None) or FAKE_CONSUMER,
        is_manual=getattr(handler, "is_manual", True),
    )

parse_message_batch async staticmethod #

parse_message_batch(message)

Parses a batch of messages from a Kafka consumer.

Source code in faststream/confluent/parser.py
@staticmethod
async def parse_message_batch(
    message: Tuple["Message", ...],
) -> KafkaMessage:
    """Parses a batch of messages from a Kafka consumer."""
    body: List[Any] = []
    batch_headers: List[Dict[str, str]] = []

    first = message[0]
    last = message[-1]

    for m in message:
        body.append(m.value() or b"")
        batch_headers.append(_parse_msg_headers(m.headers() or ()))

    headers = next(iter(batch_headers), {})

    _, first_timestamp = first.timestamp()

    handler: Optional[LogicSubscriber[Any]] = context.get_local("handler_")

    return KafkaMessage(
        body=body,
        headers=headers,
        batch_headers=batch_headers,
        reply_to=headers.get("reply_to", ""),
        content_type=headers.get("content-type"),
        message_id=f"{first.offset()}-{last.offset()}-{first_timestamp}",
        correlation_id=headers.get("correlation_id", gen_cor_id()),
        raw_message=message,
        consumer=getattr(handler, "consumer", None) or FAKE_CONSUMER,
        is_manual=getattr(handler, "is_manual", True),
    )

decode_message async staticmethod #

decode_message(msg)

Decodes a message.

Source code in faststream/confluent/parser.py
@staticmethod
async def decode_message(
    msg: "StreamMessage[Message]",
) -> "DecodedMessage":
    """Decodes a message."""
    return decode_message(msg)

decode_message_batch async classmethod #

decode_message_batch(msg)

Decode a batch of messages.

Source code in faststream/confluent/parser.py
@classmethod
async def decode_message_batch(
    cls,
    msg: "StreamMessage[Tuple[Message, ...]]",
) -> "DecodedMessage":
    """Decode a batch of messages."""
    return [decode_message(await cls.parse_message(m)) for m in msg.raw_message]