Skip to content

timeout_scope

faststream.utils.functions.timeout_scope #

timeout_scope(
    timeout: Optional[float] = 30,
    raise_timeout: bool = False,
) -> ContextManager[anyio.CancelScope]
Source code in faststream/utils/functions.py
def timeout_scope(
    timeout: Optional[float] = 30,
    raise_timeout: bool = False,
) -> ContextManager[anyio.CancelScope]:
    scope: Callable[[Optional[float]], ContextManager[anyio.CancelScope]]
    if raise_timeout:
        scope = anyio.fail_after
    else:
        scope = anyio.move_on_after

    return scope(timeout)

Last update: 2023-11-13