Skip to content

ASGIMultiprocess

faststream.cli.supervisors.asgi_multiprocess.ASGIMultiprocess #

ASGIMultiprocess(target, args, workers)
Source code in faststream/cli/supervisors/asgi_multiprocess.py
def __init__(
    self,
    target: str,
    args: Tuple[str, Dict[str, str], bool, Optional[Path], int],
    workers: int,
) -> None:
    _, run_extra_options, is_factory, _, log_level = args
    self._target = target
    self._run_extra_options = cast_uvicorn_params(run_extra_options or {})
    self._workers = workers
    self._is_factory = is_factory
    self._log_level = log_level

run #

run()
Source code in faststream/cli/supervisors/asgi_multiprocess.py
def run(self) -> None:
    if not HAS_UVICORN:
        raise ImportError(INSTALL_UVICORN)

    config = UvicornExtraConfig(
        app=self._target,
        factory=self._is_factory,
        log_level=self._log_level,
        workers=self._workers,
        **{
            key: v
            for key, v in self._run_extra_options.items()
            if key in set(inspect.signature(uvicorn.Config).parameters.keys())
        },
        run_extra_options=self._run_extra_options,
    )
    server = uvicorn.Server(config)
    sock = config.bind_socket()
    UvicornMultiprocess(config, target=server.run, sockets=[sock]).run()