A base class for implementing a reloader process.
Source code in faststream/cli/supervisors/basereload.py
| def __init__(
self,
target: "DecoratedCallable",
args: Tuple[Any, ...],
reload_delay: Optional[float] = 0.5,
) -> None:
self._target = target
self._args = args
self.should_exit = threading.Event()
self.pid = os.getpid()
self.reload_delay = reload_delay
set_exit(lambda *_: self.should_exit.set(), sync=True)
|
reloader_name class-attribute
instance-attribute
should_exit instance-attribute
reload_delay instance-attribute
run
Source code in faststream/cli/supervisors/basereload.py
| def run(self) -> None:
self.startup()
while not self.should_exit.wait(self.reload_delay):
if self.should_restart(): # pragma: no branch
self.restart()
self.shutdown()
|
startup
Source code in faststream/cli/supervisors/basereload.py
| def startup(self) -> None:
logger.info(f"Started reloader process [{self.pid}] using {self.reloader_name}")
self._process = self._start_process()
|
restart
Source code in faststream/cli/supervisors/basereload.py
| def restart(self) -> None:
self._stop_process()
logger.info("Process successfully reloaded")
self._process = self._start_process()
|
shutdown
Source code in faststream/cli/supervisors/basereload.py
| def shutdown(self) -> None:
self._stop_process()
logger.info(f"Stopping reloader process [{self.pid}]")
|
should_restart
Source code in faststream/cli/supervisors/basereload.py
| def should_restart(self) -> bool:
raise NotImplementedError("Reload strategies should override should_restart()")
|