Skip to content

HandlerCallWrapper

faststream.broker.wrapper.HandlerCallWrapper #

HandlerCallWrapper(call: Callable[P_HandlerParams, T_HandlerReturn])

Bases: Generic[MsgType, P_HandlerParams, T_HandlerReturn]

A generic class to wrap handler calls.

METHOD DESCRIPTION
__new__

Create a new instance of the class

__init__

Initialize the instance

__call__

Call the wrapped handler

set_wrapped

Set the wrapped handler call

call_wrapped

Call the wrapped handler

wait_call

Wait for the handler call to complete

Initialize a handler.

PARAMETER DESCRIPTION
call

A callable object that represents the handler function.

TYPE: Callable[P_HandlerParams, T_HandlerReturn]

Source code in faststream/broker/wrapper.py
def __init__(
    self,
    call: Callable[P_HandlerParams, T_HandlerReturn],
) -> None:
    """Initialize a handler.

    Args:
        call: A callable object that represents the handler function.

    Attributes:
        _original_call: The original handler function.
        _wrapped_call: The wrapped handler function.
        _publishers: A list of publishers.
        mock: A MagicMock object.
        __name__: The name of the handler function.

    """
    if not isinstance(call, HandlerCallWrapper):
        self._original_call = call
        self._wrapped_call = None
        self._publishers = []

        self.mock = None
        self.future = None
        self.is_test = False

future instance-attribute #

future: Optional[Future[Any]]

is_test instance-attribute #

is_test: bool

mock instance-attribute #

call_wrapped #

call_wrapped(message: StreamMessage[MsgType]) -> Union[Optional[WrappedReturn[T_HandlerReturn]], Awaitable[Optional[WrappedReturn[T_HandlerReturn]]]]

Calls the wrapped function with the given message.

PARAMETER DESCRIPTION
message

The message to be passed to the wrapped function.

TYPE: StreamMessage[MsgType]

RETURNS DESCRIPTION
Union[Optional[WrappedReturn[T_HandlerReturn]], Awaitable[Optional[WrappedReturn[T_HandlerReturn]]]]

The result of the wrapped function call.

RAISES DESCRIPTION
AssertionError

If set_wrapped has not been called before calling this function.

AssertionError

If the broker has not been started before calling this function.

Source code in faststream/broker/wrapper.py
def call_wrapped(
    self,
    message: StreamMessage[MsgType],
) -> Union[
    Optional[WrappedReturn[T_HandlerReturn]],
    Awaitable[Optional[WrappedReturn[T_HandlerReturn]]],
]:
    """Calls the wrapped function with the given message.

    Args:
        message: The message to be passed to the wrapped function.

    Returns:
        The result of the wrapped function call.

    Raises:
        AssertionError: If `set_wrapped` has not been called before calling this function.
        AssertionError: If the broker has not been started before calling this function.

    """
    assert self._wrapped_call, "You should use `set_wrapped` first"  # nosec B101
    if self.is_test:
        assert self.mock  # nosec B101
        self.mock(message.decoded_body)
    return self._wrapped_call(message)

refresh #

refresh(with_mock: bool = False) -> None
Source code in faststream/broker/wrapper.py
def refresh(self, with_mock: bool = False) -> None:
    self.future = asyncio.Future()
    if with_mock and self.mock is not None:
        self.mock.reset_mock()

reset_test #

reset_test() -> None
Source code in faststream/broker/wrapper.py
def reset_test(self) -> None:
    self.is_test = False
    self.mock = None
    self.future = None

set_test #

set_test() -> None
Source code in faststream/broker/wrapper.py
def set_test(self) -> None:
    self.is_test = True
    if self.mock is None:
        self.mock = MagicMock()
    self.refresh(with_mock=True)

set_wrapped #

set_wrapped(wrapped: WrappedHandlerCall[MsgType, T_HandlerReturn]) -> None

Set the wrapped handler call.

PARAMETER DESCRIPTION
wrapped

The wrapped handler call to set

TYPE: WrappedHandlerCall[MsgType, T_HandlerReturn]

Source code in faststream/broker/wrapper.py
def set_wrapped(
    self, wrapped: WrappedHandlerCall[MsgType, T_HandlerReturn]
) -> None:
    """Set the wrapped handler call.

    Args:
        wrapped: The wrapped handler call to set

    """
    self._wrapped_call = wrapped

trigger #

trigger(result: Any = None, error: Optional[BaseException] = None) -> None
Source code in faststream/broker/wrapper.py
def trigger(
    self,
    result: Any = None,
    error: Optional[BaseException] = None,
) -> None:
    if not self.is_test:
        return

    assert (  # nosec B101
        self.future is not None
    ), "You can use this method only with TestClient"

    if self.future.done():
        self.future = asyncio.Future()

    if error:
        self.future.set_exception(error)
    else:
        self.future.set_result(result)

wait_call async #

wait_call(timeout: Optional[float] = None) -> None

Waits for a call with an optional timeout.

PARAMETER DESCRIPTION
timeout

Optional timeout in seconds

TYPE: Optional[float] DEFAULT: None

RAISES DESCRIPTION
AssertionError

If the broker is not started

RETURNS DESCRIPTION
None

None

Source code in faststream/broker/wrapper.py
async def wait_call(self, timeout: Optional[float] = None) -> None:
    """Waits for a call with an optional timeout.

    Args:
        timeout: Optional timeout in seconds

    Raises:
        AssertionError: If the broker is not started

    Returns:
        None

    """
    assert (  # nosec B101
        self.future is not None
    ), "You can use this method only with TestClient"
    with anyio.fail_after(timeout):
        await self.future