Skip to content

Multiprocess

faststream.cli.supervisors.multiprocess.Multiprocess #

Multiprocess(
    target: DecoratedCallable,
    args: Tuple[Any, ...],
    workers: int,
)

Bases: BaseReload

A class to represent a multiprocess.

METHOD DESCRIPTION
startup

starts the parent process and creates worker processes

shutdown

terminates and joins all worker processes, and stops the parent process

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Initialize a new instance of the class.

PARAMETER DESCRIPTION
target

The target callable object to be executed.

TYPE: DecoratedCallable

args

The arguments to be passed to the target callable.

TYPE: Tuple[Any, ...]

workers

The number of workers to be used.

TYPE: int

RETURNS DESCRIPTION
None

None.

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Source code in faststream/cli/supervisors/multiprocess.py
def __init__(
    self,
    target: DecoratedCallable,
    args: Tuple[Any, ...],
    workers: int,
) -> None:
    """Initialize a new instance of the class.

    Args:
        target: The target callable object to be executed.
        args: The arguments to be passed to the target callable.
        workers: The number of workers to be used.

    Returns:
        None.
    !!! note

        The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
    """
    super().__init__(target, args, None)

    self.workers = workers
    self.processes: List[SpawnProcess] = []

pid instance-attribute #

pid: int = os.getpid()

processes instance-attribute #

processes: List[SpawnProcess] = []

reload_delay instance-attribute #

reload_delay: Optional[float] = reload_delay

reloader_name class-attribute instance-attribute #

reloader_name: str = ''

should_exit instance-attribute #

should_exit: threading.Event = threading.Event()

workers instance-attribute #

workers = workers

restart #

restart() -> None
Source code in faststream/cli/supervisors/basereload.py
def restart(self) -> None:
    self._stop_process()
    logger.info("Process successfully reloaded")
    self._process = self._start_process()

run #

run() -> None
Source code in faststream/cli/supervisors/basereload.py
def run(self) -> None:
    self.startup()
    while not self.should_exit.wait(self.reload_delay):
        if self.should_restart():  # pragma: no branch
            self.restart()
    self.shutdown()

should_restart #

should_restart() -> bool
Source code in faststream/cli/supervisors/basereload.py
def should_restart(self) -> bool:
    raise NotImplementedError("Reload strategies should override should_restart()")

shutdown #

shutdown() -> None
Source code in faststream/cli/supervisors/multiprocess.py
def shutdown(self) -> None:
    for process in self.processes:
        process.terminate()
        logger.info(f"Stopping child process [{process.pid}]")
        process.join()

    logger.info(f"Stopping parent process [{self.pid}]")

startup #

startup() -> None
Source code in faststream/cli/supervisors/multiprocess.py
def startup(self) -> None:
    logger.info(f"Started parent process [{self.pid}]")

    for _ in range(self.workers):
        process = self._start_process()
        logger.info(f"Started child process [{process.pid}]")
        self.processes.append(process)

Last update: 2023-11-13