Multiprocess(target, args, workers, reload_delay=0.5)
Bases: BaseReload
A class to represent a multiprocess.
Source code in faststream/cli/supervisors/multiprocess.py
| def __init__(
self,
target: "DecoratedCallable",
args: Tuple[Any, ...],
workers: int,
reload_delay: float = 0.5,
) -> None:
super().__init__(target, args, reload_delay)
self.workers = workers
self.processes: List[SpawnProcess] = []
|
reload_delay instance-attribute
should_exit instance-attribute
reloader_name class-attribute
instance-attribute
workers instance-attribute
processes instance-attribute
run
Source code in faststream/cli/supervisors/basereload.py
| def run(self) -> None:
self.startup()
while not self.should_exit.wait(self.reload_delay):
if self.should_restart(): # pragma: no branch
self.restart()
self.shutdown()
|
startup
Source code in faststream/cli/supervisors/multiprocess.py
| def startup(self) -> None:
logger.info(f"Started parent process [{self.pid}]")
for _ in range(self.workers):
process = self._start_process()
logger.info(f"Started child process [{process.pid}]")
self.processes.append(process)
|
shutdown
Source code in faststream/cli/supervisors/multiprocess.py
| def shutdown(self) -> None:
for process in self.processes:
process.terminate()
logger.info(f"Stopping child process [{process.pid}]")
process.join()
logger.info(f"Stopping parent process [{self.pid}]")
|
restart
Source code in faststream/cli/supervisors/multiprocess.py
| def restart(self) -> None:
active_processes = []
for process in self.processes:
if process.is_alive():
active_processes.append(process)
continue
pid = process.pid
exitcode = process.exitcode
log_msg = "Worker (pid:%s) exited with code %s."
if exitcode and abs(exitcode) == signal.SIGKILL:
log_msg += " Perhaps out of memory?"
logger.error(log_msg, pid, exitcode)
process.kill()
new_process = self._start_process()
logger.info(f"Started child process [{new_process.pid}]")
active_processes.append(new_process)
self.processes = active_processes
|
should_restart
Source code in faststream/cli/supervisors/multiprocess.py
| def should_restart(self) -> bool:
return not all(p.is_alive() for p in self.processes)
|