Skip to content

Multiprocess

faststream.cli.supervisors.multiprocess.Multiprocess #

Multiprocess(target, args, workers, reload_delay=0.5)

Bases: BaseReload

A class to represent a multiprocess.

Source code in faststream/cli/supervisors/multiprocess.py
def __init__(
    self,
    target: "DecoratedCallable",
    args: Tuple[Any, ...],
    workers: int,
    reload_delay: float = 0.5,
) -> None:
    super().__init__(target, args, reload_delay)

    self.workers = workers
    self.processes: List[SpawnProcess] = []

reload_delay instance-attribute #

reload_delay = reload_delay

should_exit instance-attribute #

should_exit = Event()

pid instance-attribute #

pid = getpid()

reloader_name class-attribute instance-attribute #

reloader_name = ''

workers instance-attribute #

workers = workers

processes instance-attribute #

processes = []

run #

run()
Source code in faststream/cli/supervisors/basereload.py
def run(self) -> None:
    self.startup()
    while not self.should_exit.wait(self.reload_delay):
        if self.should_restart():  # pragma: no branch
            self.restart()
    self.shutdown()

startup #

startup()
Source code in faststream/cli/supervisors/multiprocess.py
def startup(self) -> None:
    logger.info(f"Started parent process [{self.pid}]")

    for _ in range(self.workers):
        process = self._start_process()
        logger.info(f"Started child process [{process.pid}]")
        self.processes.append(process)

shutdown #

shutdown()
Source code in faststream/cli/supervisors/multiprocess.py
def shutdown(self) -> None:
    for process in self.processes:
        process.terminate()
        logger.info(f"Stopping child process [{process.pid}]")
        process.join()

    logger.info(f"Stopping parent process [{self.pid}]")

restart #

restart()
Source code in faststream/cli/supervisors/multiprocess.py
def restart(self) -> None:
    active_processes = []

    for process in self.processes:
        if process.is_alive():
            active_processes.append(process)
            continue

        pid = process.pid
        exitcode = process.exitcode

        log_msg = "Worker (pid:%s) exited with code %s."
        if exitcode and abs(exitcode) == signal.SIGKILL:
            log_msg += " Perhaps out of memory?"
        logger.error(log_msg, pid, exitcode)

        process.kill()

        new_process = self._start_process()
        logger.info(f"Started child process [{new_process.pid}]")
        active_processes.append(new_process)

    self.processes = active_processes

should_restart #

should_restart()
Source code in faststream/cli/supervisors/multiprocess.py
def should_restart(self) -> bool:
    return not all(p.is_alive() for p in self.processes)