Bases: Generic[MsgType, P_HandlerParams, T_HandlerReturn]
A generic class to wrap handler calls.
Initialize a handler.
Source code in faststream/broker/wrapper/call.py
| def __init__(
self,
call: Callable[P_HandlerParams, T_HandlerReturn],
) -> None:
"""Initialize a handler."""
if not isinstance(call, HandlerCallWrapper):
self._original_call = call
self._wrapped_call = None
self._publishers = []
self.mock = None
self.future = None
self.is_test = False
|
future instance-attribute
is_test instance-attribute
call_wrapped
Calls the wrapped function with the given message.
Source code in faststream/broker/wrapper/call.py
| def call_wrapped(
self,
message: "StreamMessage[MsgType]",
) -> Awaitable[Any]:
"""Calls the wrapped function with the given message."""
assert self._wrapped_call, "You should use `set_wrapped` first" # nosec B101
if self.is_test:
assert self.mock # nosec B101
self.mock(message._decoded_body)
return self._wrapped_call(message)
|
wait_call async
Waits for a call with an optional timeout.
Source code in faststream/broker/wrapper/call.py
| async def wait_call(self, timeout: Optional[float] = None) -> None:
"""Waits for a call with an optional timeout."""
assert ( # nosec B101
self.future is not None
), "You can use this method only with TestClient"
with anyio.fail_after(timeout):
await self.future
|
set_test
Source code in faststream/broker/wrapper/call.py
| def set_test(self) -> None:
self.is_test = True
if self.mock is None:
self.mock = MagicMock()
self.refresh(with_mock=True)
|
reset_test
Source code in faststream/broker/wrapper/call.py
| def reset_test(self) -> None:
self.is_test = False
self.mock = None
self.future = None
|
trigger
trigger(result=None, error=None)
Source code in faststream/broker/wrapper/call.py
| def trigger(
self,
result: Any = None,
error: Optional[BaseException] = None,
) -> None:
if not self.is_test:
return
if self.future is None:
raise SetupError("You can use this method only with TestClient")
if self.future.done():
self.future = asyncio.Future()
if error:
self.future.set_exception(error)
else:
self.future.set_result(result)
|
refresh
Source code in faststream/broker/wrapper/call.py
| def refresh(self, with_mock: bool = False) -> None:
if asyncio.events._get_running_loop() is not None:
self.future = asyncio.Future()
if with_mock and self.mock is not None:
self.mock.reset_mock()
|
set_wrapped
set_wrapped(*, apply_types, is_validate, dependencies, _get_dependant, _call_decorators)
Source code in faststream/broker/wrapper/call.py
| def set_wrapped(
self,
*,
apply_types: bool,
is_validate: bool,
dependencies: Iterable["Depends"],
_get_dependant: Optional[Callable[..., Any]],
_call_decorators: Iterable["Decorator"],
) -> Optional["CallModel[..., Any]"]:
call = self._original_call
for decor in _call_decorators:
call = decor(call)
self._original_call = call
f: Callable[..., Awaitable[Any]] = to_async(call)
dependent: Optional[CallModel[..., Any]] = None
if _get_dependant is None:
dependent = build_call_model(
f,
cast=is_validate,
extra_dependencies=dependencies, # type: ignore[arg-type]
)
if apply_types:
wrapper: _InjectWrapper[Any, Any] = inject(func=None)
f = wrapper(func=f, model=dependent)
f = _wrap_decode_message(
func=f,
params_ln=len(dependent.flat_params),
)
self._wrapped_call = f
return dependent
|