Skip to content

AioKafkaBatchParser

faststream.kafka.parser.AioKafkaBatchParser #

AioKafkaBatchParser(msg_class, regex)

Bases: AioKafkaParser

Source code in faststream/kafka/parser.py
def __init__(
    self,
    msg_class: Type[KafkaMessage],
    regex: Optional["Pattern[str]"],
) -> None:
    self.msg_class = msg_class
    self.regex = regex

msg_class instance-attribute #

msg_class = msg_class

regex instance-attribute #

regex = regex

get_path #

get_path(topic)
Source code in faststream/kafka/parser.py
def get_path(self, topic: str) -> Dict[str, str]:
    if self.regex and (match := self.regex.match(topic)):
        return match.groupdict()
    else:
        return {}

parse_message async #

parse_message(message)

Parses a batch of messages from a Kafka consumer.

Source code in faststream/kafka/parser.py
async def parse_message(
    self,
    message: Tuple["ConsumerRecord", ...],
) -> "StreamMessage[Tuple[ConsumerRecord, ...]]":
    """Parses a batch of messages from a Kafka consumer."""
    body: List[Any] = []
    batch_headers: List[Dict[str, str]] = []

    first = message[0]
    last = message[-1]

    for m in message:
        body.append(m.value or b"")
        batch_headers.append({i: j.decode() for i, j in m.headers})

    headers = next(iter(batch_headers), {})

    handler: Optional[LogicSubscriber[Any]] = context.get_local("handler_")

    return self.msg_class(
        body=body,
        headers=headers,
        batch_headers=batch_headers,
        reply_to=headers.get("reply_to", ""),
        content_type=headers.get("content-type"),
        message_id=f"{first.offset}-{last.offset}-{first.timestamp}",
        correlation_id=headers.get("correlation_id", gen_cor_id()),
        raw_message=message,
        path=self.get_path(first.topic),
        consumer=getattr(handler, "consumer", None) or FAKE_CONSUMER,
    )

decode_message async #

decode_message(msg)

Decode a batch of messages.

Source code in faststream/kafka/parser.py
async def decode_message(
    self,
    msg: "StreamMessage[Tuple[ConsumerRecord, ...]]",
) -> "DecodedMessage":
    """Decode a batch of messages."""
    # super() should be here due python can't find it in comprehension
    super_obj = cast(AioKafkaParser, super())

    return [
        decode_message(await super_obj.parse_message(m)) for m in msg.raw_message
    ]