async def parse_message(
self,
message: Mapping[str, Any],
) -> "StreamMessage[Mapping[str, Any]]":
data, headers, batch_headers = self._parse_data(message)
id_ = gen_cor_id()
return self.msg_class(
raw_message=message,
body=data,
path=self.get_path(message),
headers=headers,
batch_headers=batch_headers,
reply_to=headers.get("reply_to", ""),
content_type=headers.get("content-type"),
message_id=headers.get("message_id", id_),
correlation_id=headers.get("correlation_id", id_),
)