Skip to content

HandlerCallWrapper

faststream.broker.wrapper.call.HandlerCallWrapper #

HandlerCallWrapper(call)

Bases: Generic[MsgType, P_HandlerParams, T_HandlerReturn]

A generic class to wrap handler calls.

Initialize a handler.

Source code in faststream/broker/wrapper/call.py
def __init__(
    self,
    call: Callable[P_HandlerParams, T_HandlerReturn],
) -> None:
    """Initialize a handler."""
    if not isinstance(call, HandlerCallWrapper):
        self._original_call = call
        self._wrapped_call = None
        self._publishers = []

        self.mock = None
        self.future = None
        self.is_test = False

mock instance-attribute #

mock

future instance-attribute #

future

is_test instance-attribute #

is_test

call_wrapped #

call_wrapped(message)

Calls the wrapped function with the given message.

Source code in faststream/broker/wrapper/call.py
def call_wrapped(
    self,
    message: "StreamMessage[MsgType]",
) -> Awaitable[Any]:
    """Calls the wrapped function with the given message."""
    assert self._wrapped_call, "You should use `set_wrapped` first"  # nosec B101
    if self.is_test:
        assert self.mock  # nosec B101
        self.mock(message._decoded_body)
    return self._wrapped_call(message)

wait_call async #

wait_call(timeout=None)

Waits for a call with an optional timeout.

Source code in faststream/broker/wrapper/call.py
async def wait_call(self, timeout: Optional[float] = None) -> None:
    """Waits for a call with an optional timeout."""
    assert (  # nosec B101
        self.future is not None
    ), "You can use this method only with TestClient"
    with anyio.fail_after(timeout):
        await self.future

set_test #

set_test()
Source code in faststream/broker/wrapper/call.py
def set_test(self) -> None:
    self.is_test = True
    if self.mock is None:
        self.mock = MagicMock()
    self.refresh(with_mock=True)

reset_test #

reset_test()
Source code in faststream/broker/wrapper/call.py
def reset_test(self) -> None:
    self.is_test = False
    self.mock = None
    self.future = None

trigger #

trigger(result=None, error=None)
Source code in faststream/broker/wrapper/call.py
def trigger(
    self,
    result: Any = None,
    error: Optional[BaseException] = None,
) -> None:
    if not self.is_test:
        return

    if self.future is None:
        raise SetupError("You can use this method only with TestClient")

    if self.future.done():
        self.future = asyncio.Future()

    if error:
        self.future.set_exception(error)
    else:
        self.future.set_result(result)

refresh #

refresh(with_mock=False)
Source code in faststream/broker/wrapper/call.py
def refresh(self, with_mock: bool = False) -> None:
    if asyncio.events._get_running_loop() is not None:
        self.future = asyncio.Future()

    if with_mock and self.mock is not None:
        self.mock.reset_mock()

set_wrapped #

set_wrapped(*, apply_types, is_validate, dependencies, _get_dependant, _call_decorators)
Source code in faststream/broker/wrapper/call.py
def set_wrapped(
    self,
    *,
    apply_types: bool,
    is_validate: bool,
    dependencies: Iterable["Depends"],
    _get_dependant: Optional[Callable[..., Any]],
    _call_decorators: Iterable["Decorator"],
) -> Optional["CallModel[..., Any]"]:
    call = self._original_call
    for decor in _call_decorators:
        call = decor(call)
    self._original_call = call

    f: Callable[..., Awaitable[Any]] = to_async(call)

    dependent: Optional[CallModel[..., Any]] = None
    if _get_dependant is None:
        dependent = build_call_model(
            f,
            cast=is_validate,
            extra_dependencies=dependencies,  # type: ignore[arg-type]
        )

        if apply_types:
            wrapper: _InjectWrapper[Any, Any] = inject(func=None)
            f = wrapper(func=f, model=dependent)

        f = _wrap_decode_message(
            func=f,
            params_ln=len(dependent.flat_params),
        )

    self._wrapped_call = f
    return dependent