resolve_payloads(
payloads: List[Tuple[AnyDict, str]],
extra: str = "",
served_words: int = 1,
) -> AnyDict
Resolve payloads.
PARAMETER | DESCRIPTION |
payloads | A list of dictionaries representing payloads. TYPE: List[Tuple[AnyDict, str]] |
RETURNS | DESCRIPTION |
AnyDict | A dictionary representing the resolved payload. |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Source code in faststream/asyncapi/utils.py
| def resolve_payloads(
payloads: List[Tuple[AnyDict, str]],
extra: str = "",
served_words: int = 1,
) -> AnyDict:
"""Resolve payloads.
Args:
payloads: A list of dictionaries representing payloads.
Returns:
A dictionary representing the resolved payload.
!!! note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
ln = len(payloads)
payload: AnyDict
if ln > 1:
one_of_payloads = {}
for body, handler_name in payloads:
title = body["title"]
words = title.split(":")
if len(words) > 1: # not pydantic model case
body["title"] = title = ":".join(
filter(
lambda x: bool(x),
(
handler_name,
extra if extra not in words else "",
*words[served_words:],
),
)
)
one_of_payloads[title] = body
payload = {"oneOf": one_of_payloads}
elif ln == 1:
payload = payloads[0][0]
else:
payload = {}
return payload
|