Skip to content

timeout_scope

faststream.utils.functions.timeout_scope #

timeout_scope(
    timeout: Optional[float] = 30,
    raise_timeout: bool = False,
) -> ContextManager[CancelScope]
Source code in faststream/utils/functions.py
def timeout_scope(
    timeout: Optional[float] = 30,
    raise_timeout: bool = False,
) -> ContextManager[anyio.CancelScope]:
    scope: Callable[[Optional[float]], ContextManager[anyio.CancelScope]]
    scope = anyio.fail_after if raise_timeout else anyio.move_on_after  # type: ignore[assignment]

    return scope(timeout)