Skip to content

parse_handler_params

faststream.asyncapi.message.parse_handler_params #

parse_handler_params(call: CallModel[Any, Any], prefix: str = '') -> Dict[str, Any]

Parses the handler parameters.

PARAMETER DESCRIPTION
call

The call model.

TYPE: CallModel[Any, Any]

prefix

The prefix for the model schema.

TYPE: str DEFAULT: ''

RETURNS DESCRIPTION
Dict[str, Any]

A dictionary containing the parsed parameters.

Source code in faststream/asyncapi/message.py
def parse_handler_params(call: CallModel[Any, Any], prefix: str = "") -> Dict[str, Any]:
    """Parses the handler parameters.

    Args:
        call: The call model.
        prefix: The prefix for the model schema.

    Returns:
        A dictionary containing the parsed parameters.

    """
    model = call.model
    assert model  # nosec B101
    body = get_model_schema(
        create_model(  # type: ignore[call-overload]
            model.__name__,
            **call.flat_params,  # type: ignore[arg-type]
        ),
        prefix=prefix,
        exclude=tuple(call.custom_fields.keys()),
    )

    if body is None:
        return {"title": "EmptyPayload", "type": "null"}

    return body