Skip to content

resolve_custom_func

faststream.broker.parsers.resolve_custom_func #

resolve_custom_func(
    custom_func: Optional[
        Union[
            CustomDecoder[StreamMsg],
            CustomParser[MsgType, StreamMsg],
        ]
    ],
    default_func: Union[
        Decoder[StreamMsg], Parser[MsgType, StreamMsg]
    ],
) -> Union[Decoder[StreamMsg], Parser[MsgType, StreamMsg]]

Resolve a custom function.

PARAMETER DESCRIPTION
custom_func

Optional custom function of type CustomDecoder or CustomParser.

TYPE: Optional[Union[CustomDecoder[StreamMsg], CustomParser[MsgType, StreamMsg]]]

default_func

Default function of type Decoder or Parser.

TYPE: Union[Decoder[StreamMsg], Parser[MsgType, StreamMsg]]

RETURNS DESCRIPTION
Union[Decoder[StreamMsg], Parser[MsgType, StreamMsg]]

The resolved function of type Decoder or Parser.

Source code in faststream/broker/parsers.py
def resolve_custom_func(  # type: ignore[misc]
    custom_func: Optional[
        Union[CustomDecoder[StreamMsg], CustomParser[MsgType, StreamMsg]]
    ],
    default_func: Union[Decoder[StreamMsg], Parser[MsgType, StreamMsg]],
) -> Union[Decoder[StreamMsg], Parser[MsgType, StreamMsg]]:
    """Resolve a custom function.

    Args:
        custom_func: Optional custom function of type CustomDecoder or CustomParser.
        default_func: Default function of type Decoder or Parser.

    Returns:
        The resolved function of type Decoder or Parser.

    """
    if custom_func is None:
        return default_func

    original_params = inspect.signature(custom_func).parameters
    if len(original_params) == 1:
        return cast(Union[Decoder[StreamMsg], Parser[MsgType, StreamMsg]], custom_func)

    else:
        name = tuple(original_params.items())[1][0]
        return partial(custom_func, **{name: default_func})  # type: ignore