Skip to content

BaseReload

faststream.cli.supervisors.basereload.BaseReload #

BaseReload(target, args, reload_delay=0.5)

A base class for implementing a reloader process.

Source code in faststream/cli/supervisors/basereload.py
def __init__(
    self,
    target: "DecoratedCallable",
    args: Tuple[Any, ...],
    reload_delay: Optional[float] = 0.5,
) -> None:
    self._target = target
    self._args = args

    self.should_exit = threading.Event()
    self.pid = os.getpid()
    self.reload_delay = reload_delay

    set_exit(lambda *_: self.should_exit.set(), sync=True)

reloader_name class-attribute instance-attribute #

reloader_name = ''

should_exit instance-attribute #

should_exit = Event()

pid instance-attribute #

pid = getpid()

reload_delay instance-attribute #

reload_delay = reload_delay

run #

run()
Source code in faststream/cli/supervisors/basereload.py
def run(self) -> None:
    self.startup()
    while not self.should_exit.wait(self.reload_delay):
        if self.should_restart():  # pragma: no branch
            self.restart()
    self.shutdown()

startup #

startup()
Source code in faststream/cli/supervisors/basereload.py
def startup(self) -> None:
    logger.info(f"Started reloader process [{self.pid}] using {self.reloader_name}")
    self._process = self._start_process()

restart #

restart()
Source code in faststream/cli/supervisors/basereload.py
def restart(self) -> None:
    self._stop_process()
    logger.info("Process successfully reloaded")
    self._process = self._start_process()

shutdown #

shutdown()
Source code in faststream/cli/supervisors/basereload.py
def shutdown(self) -> None:
    self._stop_process()
    logger.info(f"Stopping reloader process [{self.pid}]")

should_restart #

should_restart()
Source code in faststream/cli/supervisors/basereload.py
def should_restart(self) -> bool:
    raise NotImplementedError("Reload strategies should override should_restart()")