Skip to content

resolve_custom_func

faststream.broker.utils.resolve_custom_func #

resolve_custom_func(custom_func, default_func)

Resolve a custom parser/decoder with default one.

Source code in faststream/broker/utils.py
def resolve_custom_func(
    custom_func: Optional["CustomCallable"],
    default_func: "AsyncCallable",
) -> "AsyncCallable":
    """Resolve a custom parser/decoder with default one."""
    if custom_func is None:
        return default_func

    original_params = inspect.signature(custom_func).parameters

    if len(original_params) == 1:
        return to_async(cast(Union["SyncCallable", "AsyncCallable"], custom_func))

    else:
        name = tuple(original_params.items())[1][0]
        return partial(to_async(custom_func), **{name: default_func})