Bases: Generic[MsgType, P_HandlerParams, T_HandlerReturn]
A generic class to wrap handler calls.
METHOD | DESCRIPTION |
__new__ | Create a new instance of the class |
__init__ | |
__call__ | |
set_wrapped | Set the wrapped handler call |
call_wrapped | |
wait_call | Wait for the handler call to complete |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Initialize a handler.
PARAMETER | DESCRIPTION |
call | A callable object that represents the handler function. TYPE: Callable[P_HandlerParams, T_HandlerReturn] |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Source code in faststream/broker/wrapper.py
| def __init__(
self,
call: Callable[P_HandlerParams, T_HandlerReturn],
):
"""Initialize a handler.
Args:
call: A callable object that represents the handler function.
Attributes:
_original_call: The original handler function.
_wrapped_call: The wrapped handler function.
_publishers: A list of publishers.
mock: A MagicMock object.
__name__: The name of the handler function.
!!! note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
if not isinstance(call, HandlerCallWrapper):
self._original_call = call
self._wrapped_call = None
self._publishers = []
self.mock = None
self.future = None
self.is_test = False
|
future instance-attribute
future: Optional[asyncio.Future[Any]]
is_test instance-attribute
mock instance-attribute
mock: Optional[MagicMock]
call_wrapped
call_wrapped(
message: StreamMessage[MsgType],
) -> Union[
Optional[WrappedReturn[T_HandlerReturn]],
Awaitable[Optional[WrappedReturn[T_HandlerReturn]]],
]
Calls the wrapped function with the given message.
PARAMETER | DESCRIPTION |
message | The message to be passed to the wrapped function. TYPE: StreamMessage[MsgType] |
RETURNS | DESCRIPTION |
Union[Optional[WrappedReturn[T_HandlerReturn]], Awaitable[Optional[WrappedReturn[T_HandlerReturn]]]] | The result of the wrapped function call. |
RAISES | DESCRIPTION |
AssertionError | If set_wrapped has not been called before calling this function. |
AssertionError | If the broker has not been started before calling this function. |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Source code in faststream/broker/wrapper.py
| def call_wrapped(
self,
message: StreamMessage[MsgType],
) -> Union[
Optional[WrappedReturn[T_HandlerReturn]],
Awaitable[Optional[WrappedReturn[T_HandlerReturn]]],
]:
"""Calls the wrapped function with the given message.
Args:
message: The message to be passed to the wrapped function.
Returns:
The result of the wrapped function call.
Raises:
AssertionError: If `set_wrapped` has not been called before calling this function.
AssertionError: If the broker has not been started before calling this function.
!!! note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
assert self._wrapped_call, "You should use `set_wrapped` first" # nosec B101
if self.is_test:
assert self.mock # nosec B101
self.mock(message.decoded_body)
return self._wrapped_call(message)
|
refresh
refresh(with_mock: bool = False) -> None
Source code in faststream/broker/wrapper.py
| def refresh(self, with_mock: bool = False) -> None:
self.future = asyncio.Future()
if with_mock and self.mock is not None:
self.mock.reset_mock()
|
reset_test
Source code in faststream/broker/wrapper.py
| def reset_test(self) -> None:
self.is_test = False
self.mock = None
self.future = None
|
set_test
Source code in faststream/broker/wrapper.py
| def set_test(self) -> None:
self.is_test = True
if self.mock is None:
self.mock = MagicMock()
self.refresh(with_mock=True)
|
set_wrapped
set_wrapped(
wrapped: WrappedHandlerCall[MsgType, T_HandlerReturn]
) -> None
Set the wrapped handler call.
PARAMETER | DESCRIPTION |
wrapped | The wrapped handler call to set TYPE: WrappedHandlerCall[MsgType, T_HandlerReturn] |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Source code in faststream/broker/wrapper.py
| def set_wrapped(
self, wrapped: WrappedHandlerCall[MsgType, T_HandlerReturn]
) -> None:
"""Set the wrapped handler call.
Args:
wrapped: The wrapped handler call to set
!!! note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
self._wrapped_call = wrapped
|
trigger
trigger(
result: Any = None,
error: Optional[BaseException] = None,
) -> None
Source code in faststream/broker/wrapper.py
| def trigger(
self,
result: Any = None,
error: Optional[BaseException] = None,
) -> None:
if not self.is_test:
return
assert ( # nosec B101
self.future is not None
), "You can use this method only with TestClient"
if self.future.done():
self.future = asyncio.Future()
if error:
self.future.set_exception(error)
else:
self.future.set_result(result)
|
wait_call async
wait_call(timeout: Optional[float] = None) -> None
Waits for a call with an optional timeout.
PARAMETER | DESCRIPTION |
timeout | Optional timeout in seconds TYPE: Optional[float] DEFAULT: None |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Source code in faststream/broker/wrapper.py
| async def wait_call(self, timeout: Optional[float] = None) -> None:
"""Waits for a call with an optional timeout.
Args:
timeout: Optional timeout in seconds
Raises:
AssertionError: If the broker is not started
Returns:
None
!!! note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
assert ( # nosec B101
self.future is not None
), "You can use this method only with TestClient"
with anyio.fail_after(timeout):
await self.future
|