@classmethod
async def parse_message(
cls,
message: Union[OneMessage, BatchMessage],
) -> Union[OneRedisMessage, BatchRedisMessage]:
id_ = str(uuid4())
if message["type"] == "batch":
data = dump_json([cls.parse_one_msg(x)[0] for x in message["data"]])
return BatchRedisMessage(
raw_message=message,
body=data,
content_type="application/json",
message_id=id_,
correlation_id=id_,
)
else:
data, headers = cls.parse_one_msg(message["data"])
channel = message.get("channel", b"").decode()
handler: Optional["Handler"] = context.get_local("handler_")
if (
handler is not None
and handler.channel is not None
and (path_re := handler.channel.path_regex) is not None
and (match := path_re.match(channel)) is not None
):
path = match.groupdict()
else:
path = {}
return OneRedisMessage(
raw_message=message,
body=data,
path=path,
headers=headers,
reply_to=headers.get("reply_to", ""),
content_type=headers.get("content-type", ""),
message_id=message.get("message_id", id_),
correlation_id=headers.get("correlation_id", id_),
)