Skip to content

set_exit

faststream.cli.supervisors.utils.set_exit #

set_exit(func, *, sync=False)

Set exit handler for signals.

PARAMETER DESCRIPTION
func

A callable object that takes an integer and an optional frame type as arguments and returns any value.

TYPE: Callable[[int, Optional[FrameType]], Any]

sync

set sync or async signal callback.

TYPE: bool DEFAULT: False

Source code in faststream/cli/supervisors/utils.py
def set_exit(
    func: Callable[[int, Optional["FrameType"]], Any],
    *,
    sync: bool = False,
) -> None:
    """Set exit handler for signals.

    Args:
        func: A callable object that takes an integer and an optional frame type as arguments and returns any value.
        sync: set sync or async signal callback.
    """
    if not sync:
        with suppress(NotImplementedError):
            loop = asyncio.get_event_loop()

            for sig in HANDLED_SIGNALS:
                loop.add_signal_handler(sig, func, sig, None)

            return

    # Windows or sync mode
    for sig in HANDLED_SIGNALS:
        signal.signal(sig, func)