Skip to content

AioKafkaParser

faststream.kafka.parser.AioKafkaParser #

AioKafkaParser(msg_class, regex)

A class to parse Kafka messages.

Source code in faststream/kafka/parser.py
def __init__(
    self,
    msg_class: Type[KafkaMessage],
    regex: Optional["Pattern[str]"],
) -> None:
    self.msg_class = msg_class
    self.regex = regex

msg_class instance-attribute #

msg_class = msg_class

regex instance-attribute #

regex = regex

parse_message async #

parse_message(message)

Parses a Kafka message.

Source code in faststream/kafka/parser.py
async def parse_message(
    self,
    message: "ConsumerRecord",
) -> "StreamMessage[ConsumerRecord]":
    """Parses a Kafka message."""
    headers = {i: j.decode() for i, j in message.headers}
    handler: Optional[LogicSubscriber[Any]] = context.get_local("handler_")

    return self.msg_class(
        body=message.value or b"",
        headers=headers,
        reply_to=headers.get("reply_to", ""),
        content_type=headers.get("content-type"),
        message_id=f"{message.offset}-{message.timestamp}",
        correlation_id=headers.get("correlation_id", gen_cor_id()),
        raw_message=message,
        path=self.get_path(message.topic),
        consumer=getattr(handler, "consumer", None) or FAKE_CONSUMER,
    )

decode_message async #

decode_message(msg)

Decodes a message.

Source code in faststream/kafka/parser.py
async def decode_message(
    self,
    msg: "StreamMessage[ConsumerRecord]",
) -> "DecodedMessage":
    """Decodes a message."""
    return decode_message(msg)

get_path #

get_path(topic)
Source code in faststream/kafka/parser.py
def get_path(self, topic: str) -> Dict[str, str]:
    if self.regex and (match := self.regex.match(topic)):
        return match.groupdict()
    else:
        return {}