Set exit handler for signals.
PARAMETER | DESCRIPTION |
func | A callable object that takes an integer and an optional frame type as arguments and returns any value. TYPE: Callable[[int, Optional[FrameType]], Any] |
sync | set sync or async signal callback. TYPE: bool DEFAULT: False |
Source code in faststream/cli/supervisors/utils.py
| def set_exit(
func: Callable[[int, Optional["FrameType"]], Any],
*,
sync: bool = False,
) -> None:
"""Set exit handler for signals.
Args:
func: A callable object that takes an integer and an optional frame type as arguments and returns any value.
sync: set sync or async signal callback.
"""
if not sync:
with suppress(NotImplementedError):
loop = asyncio.get_event_loop()
for sig in HANDLED_SIGNALS:
loop.add_signal_handler(sig, func, sig, None)
return
# Windows or sync mode
for sig in HANDLED_SIGNALS:
signal.signal(sig, func)
|