Skip to content

Multiprocess

faststream.cli.supervisors.multiprocess.Multiprocess #

Multiprocess(target: DecoratedCallable, args: Tuple[Any, ...], workers: int)

Bases: BaseReload

A class to represent a multiprocess.

METHOD DESCRIPTION
startup

starts the parent process and creates worker processes

shutdown

terminates and joins all worker processes, and stops the parent process

Initialize a new instance of the class.

PARAMETER DESCRIPTION
target

The target callable object to be executed.

TYPE: DecoratedCallable

args

The arguments to be passed to the target callable.

TYPE: Tuple[Any, ...]

workers

The number of workers to be used.

TYPE: int

RETURNS DESCRIPTION
None

None.

Source code in faststream/cli/supervisors/multiprocess.py
def __init__(
    self,
    target: DecoratedCallable,
    args: Tuple[Any, ...],
    workers: int,
) -> None:
    """Initialize a new instance of the class.

    Args:
        target: The target callable object to be executed.
        args: The arguments to be passed to the target callable.
        workers: The number of workers to be used.

    Returns:
        None.

    """
    super().__init__(target, args, None)

    self.workers = workers
    self.processes: List["SpawnProcess"] = []

pid instance-attribute #

pid: int = getpid()

processes instance-attribute #

processes: List[SpawnProcess] = []

reload_delay instance-attribute #

reload_delay: Optional[float] = reload_delay

reloader_name class-attribute instance-attribute #

reloader_name: str = ''

should_exit instance-attribute #

should_exit: Event = Event()

workers instance-attribute #

workers = workers

restart #

restart() -> None
Source code in faststream/cli/supervisors/basereload.py
def restart(self) -> None:
    self._stop_process()
    logger.info("Process successfully reloaded")
    self._process = self._start_process()

run #

run() -> None
Source code in faststream/cli/supervisors/basereload.py
def run(self) -> None:
    self.startup()
    while not self.should_exit.wait(self.reload_delay):
        if self.should_restart():  # pragma: no branch
            self.restart()
    self.shutdown()

should_restart #

should_restart() -> bool
Source code in faststream/cli/supervisors/basereload.py
def should_restart(self) -> bool:
    raise NotImplementedError("Reload strategies should override should_restart()")

shutdown #

shutdown() -> None
Source code in faststream/cli/supervisors/multiprocess.py
def shutdown(self) -> None:
    for process in self.processes:
        process.terminate()
        logger.info(f"Stopping child process [{process.pid}]")
        process.join()

    logger.info(f"Stopping parent process [{self.pid}]")

startup #

startup() -> None
Source code in faststream/cli/supervisors/multiprocess.py
def startup(self) -> None:
    logger.info(f"Started parent process [{self.pid}]")

    for _ in range(self.workers):
        process = self._start_process()
        logger.info(f"Started child process [{process.pid}]")
        self.processes.append(process)