Skip to content

decode_message

faststream.broker.message.decode_message #

decode_message(message)

Decodes a message.

Source code in faststream/broker/message.py
def decode_message(message: "StreamMessage[Any]") -> "DecodedMessage":
    """Decodes a message."""
    body: Any = getattr(message, "body", message)
    m: DecodedMessage = body

    if (content_type := getattr(message, "content_type", EMPTY)) is not EMPTY:
        content_type = cast(Optional[str], content_type)

        if not content_type:
            with suppress(json.JSONDecodeError, UnicodeDecodeError):
                m = json_loads(body)

        elif ContentTypes.text.value in content_type:
            m = body.decode()

        elif ContentTypes.json.value in content_type:
            m = json_loads(body)

    else:
        with suppress(json.JSONDecodeError, UnicodeDecodeError):
            m = json_loads(body)

    return m