Parses the handler parameters.
PARAMETER | DESCRIPTION |
call | TYPE: CallModel[Any, Any] |
prefix | The prefix for the model schema. TYPE: str DEFAULT: '' |
RETURNS | DESCRIPTION |
Dict[str, Any] | A dictionary containing the parsed parameters. |
Source code in faststream/asyncapi/message.py
| def parse_handler_params(call: CallModel[Any, Any], prefix: str = "") -> Dict[str, Any]:
"""Parses the handler parameters.
Args:
call: The call model.
prefix: The prefix for the model schema.
Returns:
A dictionary containing the parsed parameters.
"""
model = call.model
assert model # nosec B101
body = get_model_schema(
create_model( # type: ignore[call-overload]
model.__name__,
**call.flat_params, # type: ignore[arg-type]
),
prefix=prefix,
exclude=tuple(call.custom_fields.keys()),
)
if body is None:
return {"title": "EmptyPayload", "type": "null"}
return body
|