Skip to content

decode_message

faststream.broker.parsers.decode_message #

decode_message(
    message: StreamMessage[Any],
) -> DecodedMessage

Decodes a message.

PARAMETER DESCRIPTION
message

The message to decode.

TYPE: StreamMessage[Any]

RETURNS DESCRIPTION
DecodedMessage

The decoded message.

RAISES DESCRIPTION
JSONDecodeError

If the message body cannot be decoded as JSON.

Source code in faststream/broker/parsers.py
def decode_message(message: StreamMessage[Any]) -> DecodedMessage:
    """Decodes a message.

    Args:
        message: The message to decode.

    Returns:
        The decoded message.

    Raises:
        JSONDecodeError: If the message body cannot be decoded as JSON.

    """
    body: Any = getattr(message, "body", message)
    m: DecodedMessage = body

    if content_type := getattr(message, "content_type", None):
        if ContentTypes.text.value in content_type:
            m = body.decode()
        elif ContentTypes.json.value in content_type:  # pragma: no branch
            m = json_loads(body)

    else:
        with suppress(json.JSONDecodeError):
            m = json_loads(body)

    return m