Bases: BaseReload
A class to represent a multiprocess.
METHOD | DESCRIPTION |
startup | starts the parent process and creates worker processes |
shutdown | terminates and joins all worker processes, and stops the parent process |
Initialize a new instance of the class.
PARAMETER | DESCRIPTION |
target | The target callable object to be executed. TYPE: DecoratedCallable |
args | The arguments to be passed to the target callable. TYPE: Tuple[Any, ...] |
workers | The number of workers to be used. TYPE: int |
Source code in faststream/cli/supervisors/multiprocess.py
| def __init__(
self,
target: DecoratedCallable,
args: Tuple[Any, ...],
workers: int,
) -> None:
"""Initialize a new instance of the class.
Args:
target: The target callable object to be executed.
args: The arguments to be passed to the target callable.
workers: The number of workers to be used.
Returns:
None.
"""
super().__init__(target, args, None)
self.workers = workers
self.processes: List["SpawnProcess"] = []
|
processes instance-attribute
processes: List[SpawnProcess] = []
reload_delay instance-attribute
reloader_name class-attribute
instance-attribute
should_exit instance-attribute
workers instance-attribute
restart
Source code in faststream/cli/supervisors/basereload.py
| def restart(self) -> None:
self._stop_process()
logger.info("Process successfully reloaded")
self._process = self._start_process()
|
run
Source code in faststream/cli/supervisors/basereload.py
| def run(self) -> None:
self.startup()
while not self.should_exit.wait(self.reload_delay):
if self.should_restart(): # pragma: no branch
self.restart()
self.shutdown()
|
should_restart
Source code in faststream/cli/supervisors/basereload.py
| def should_restart(self) -> bool:
raise NotImplementedError("Reload strategies should override should_restart()")
|
shutdown
Source code in faststream/cli/supervisors/multiprocess.py
| def shutdown(self) -> None:
for process in self.processes:
process.terminate()
logger.info(f"Stopping child process [{process.pid}]")
process.join()
logger.info(f"Stopping parent process [{self.pid}]")
|
startup
Source code in faststream/cli/supervisors/multiprocess.py
| def startup(self) -> None:
logger.info(f"Started parent process [{self.pid}]")
for _ in range(self.workers):
process = self._start_process()
logger.info(f"Started child process [{process.pid}]")
self.processes.append(process)
|