Skip to content

call_handler

faststream.broker.test.call_handler async #

call_handler(
    handler: AsyncHandler[Any],
    message: Any,
    rpc: bool = False,
    rpc_timeout: Optional[float] = 30.0,
    raise_timeout: bool = False,
) -> Optional[SendableMessage]

Asynchronously call a handler function.

PARAMETER DESCRIPTION
handler

The handler function to be called.

TYPE: AsyncHandler[Any]

message

The message to be passed to the handler function.

TYPE: Any

rpc

Whether the call is a remote procedure call (RPC).

TYPE: bool DEFAULT: False

rpc_timeout

The timeout for the RPC, in seconds.

TYPE: Optional[float] DEFAULT: 30.0

raise_timeout

Whether to raise a timeout error if the RPC times out.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Optional[SendableMessage]

The result of the handler function if rpc is True, otherwise None.

RAISES DESCRIPTION
TimeoutError

If the RPC times out and raise_timeout is True.

Source code in faststream/broker/test.py
async def call_handler(
    handler: AsyncHandler[Any],
    message: Any,
    rpc: bool = False,
    rpc_timeout: Optional[float] = 30.0,
    raise_timeout: bool = False,
) -> Optional[SendableMessage]:
    """Asynchronously call a handler function.

    Args:
        handler: The handler function to be called.
        message: The message to be passed to the handler function.
        rpc: Whether the call is a remote procedure call (RPC).
        rpc_timeout: The timeout for the RPC, in seconds.
        raise_timeout: Whether to raise a timeout error if the RPC times out.

    Returns:
        The result of the handler function if `rpc` is True, otherwise None.

    Raises:
        TimeoutError: If the RPC times out and `raise_timeout` is True.

    """
    with timeout_scope(rpc_timeout, raise_timeout):
        result = await handler.consume(message)

        if rpc is True:
            return result

    return None