Skip to content

ASGIMultiprocess

faststream.cli.supervisors.asgi_multiprocess.ASGIMultiprocess #

ASGIMultiprocess(target, args, workers)
Source code in faststream/cli/supervisors/asgi_multiprocess.py
def __init__(
    self, target: str, args: Tuple[str, Dict[str, str], bool, int], workers: int
) -> None:
    _, uvicorn_kwargs, is_factory, log_level = args
    self._target = target
    self._uvicorn_kwargs = cast_uvicorn_params(uvicorn_kwargs or {})
    self._workers = workers
    self._is_factory = is_factory
    self._log_level = log_level

run #

run()
Source code in faststream/cli/supervisors/asgi_multiprocess.py
def run(self) -> None:
    try:
        import uvicorn
    except ImportError as e:
        raise RuntimeError(
            "You need uvicorn to run FastStream ASGI App via CLI. pip install uvicorn"
        ) from e

    uvicorn_params = set(inspect.signature(uvicorn.run).parameters.keys())

    uvicorn.run(
        self._target,
        factory=self._is_factory,
        workers=self._workers,
        log_level=self._log_level,
        **{
            key: v
            for key, v in self._uvicorn_kwargs.items()
            if key in uvicorn_params
        },
    )