asyncdefparse_message(self,message:Union["ConsumerRecord","KafkaRawMessage"],)->"StreamMessage[ConsumerRecord]":"""Parses a Kafka message."""headers={i:j.decode()fori,jinmessage.headers}handler:Optional[LogicSubscriber[Any]]=context.get_local("handler_")returnself.msg_class(body=message.valueorb"",headers=headers,reply_to=headers.get("reply_to",""),content_type=headers.get("content-type"),message_id=f"{message.offset}-{message.timestamp}",correlation_id=headers.get("correlation_id",gen_cor_id()),raw_message=message,path=self.get_path(message.topic),consumer=getattr(message,"consumer",None)orgetattr(handler,"consumer",None)orFAKE_CONSUMER,)