A class to parse Kafka messages.
Source code in faststream/kafka/parser.py
| def __init__(
self,
msg_class: Type[KafkaMessage],
regex: Optional["Pattern[str]"],
) -> None:
self.msg_class = msg_class
self.regex = regex
|
msg_class instance-attribute
parse_message async
Parses a Kafka message.
Source code in faststream/kafka/parser.py
| async def parse_message(
self,
message: "ConsumerRecord",
) -> "StreamMessage[ConsumerRecord]":
"""Parses a Kafka message."""
headers = {i: j.decode() for i, j in message.headers}
handler: Optional[LogicSubscriber[Any]] = context.get_local("handler_")
return self.msg_class(
body=message.value,
headers=headers,
reply_to=headers.get("reply_to", ""),
content_type=headers.get("content-type"),
message_id=f"{message.offset}-{message.timestamp}",
correlation_id=headers.get("correlation_id", gen_cor_id()),
raw_message=message,
path=self.get_path(message.topic),
consumer=getattr(handler, "consumer", None) or FAKE_CONSUMER,
)
|
decode_message async
Decodes a message.
Source code in faststream/kafka/parser.py
| async def decode_message(
self,
msg: "StreamMessage[ConsumerRecord]",
) -> "DecodedMessage":
"""Decodes a message."""
return decode_message(msg)
|
get_path
Source code in faststream/kafka/parser.py
| def get_path(self, topic: str) -> Dict[str, str]:
if self.regex and (match := self.regex.match(topic)):
return match.groupdict()
else:
return {}
|