Skip to content

run

faststream.cli.main.run #

run(ctx, app=Argument(..., help='[python_module:FastStream] - path to your application.'), workers=Option(1, show_default=False, help='Run [workers] applications with process spawning.', envvar='FASTSTREAM_WORKERS'), log_level=Option(notset, case_sensitive=False, help='Set selected level for FastStream and brokers logger objects.', envvar='FASTSTREAM_LOG_LEVEL'), reload=Option(False, '--reload', is_flag=True, help='Restart app at directory files changes.'), watch_extensions=Option((), '--extension', '--ext', '--reload-extension', '--reload-ext', help='List of file extensions to watch by.'), app_dir=Option('.', '--app-dir', help='Look for APP in the specified directory, by adding this to the PYTHONPATH. Defaults to the current working directory.', envvar='FASTSTREAM_APP_DIR'), is_factory=Option(False, '--factory', is_flag=True, help='Treat APP as an application factory.'))

Run [MODULE:APP] FastStream application.

Source code in faststream/cli/main.py
@cli.command(
    context_settings={"allow_extra_args": True, "ignore_unknown_options": True}
)
def run(
    ctx: typer.Context,
    app: str = typer.Argument(
        ...,
        help="[python_module:FastStream] - path to your application.",
    ),
    workers: int = typer.Option(
        1,
        show_default=False,
        help="Run [workers] applications with process spawning.",
        envvar="FASTSTREAM_WORKERS",
    ),
    log_level: LogLevels = typer.Option(
        LogLevels.notset,
        case_sensitive=False,
        help="Set selected level for FastStream and brokers logger objects.",
        envvar="FASTSTREAM_LOG_LEVEL",
    ),
    reload: bool = typer.Option(
        False,
        "--reload",
        is_flag=True,
        help="Restart app at directory files changes.",
    ),
    watch_extensions: List[str] = typer.Option(
        (),
        "--extension",
        "--ext",
        "--reload-extension",
        "--reload-ext",
        help="List of file extensions to watch by.",
    ),
    app_dir: str = typer.Option(
        ".",
        "--app-dir",
        help=(
            "Look for APP in the specified directory, by adding this to the PYTHONPATH."
            " Defaults to the current working directory."
        ),
        envvar="FASTSTREAM_APP_DIR",
    ),
    is_factory: bool = typer.Option(
        False,
        "--factory",
        is_flag=True,
        help="Treat APP as an application factory.",
    ),
) -> None:
    """Run [MODULE:APP] FastStream application."""
    if watch_extensions and not reload:
        typer.echo(
            "Extra reload extensions has no effect without `--reload` flag."
            "\nProbably, you forgot it?"
        )

    app, extra = parse_cli_args(app, *ctx.args)
    casted_log_level = get_log_level(log_level)

    if app_dir:  # pragma: no branch
        sys.path.insert(0, app_dir)

    # Should be imported after sys.path changes
    module_path, app_obj = import_from_string(app, is_factory=is_factory)

    args = (app, extra, is_factory, casted_log_level)

    if reload and workers > 1:
        raise SetupError("You can't use reload option with multiprocessing")

    if reload:
        try:
            from faststream.cli.supervisors.watchfiles import WatchReloader
        except ImportError:
            warnings.warn(INSTALL_WATCHFILES, category=ImportWarning, stacklevel=1)
            _run(*args)

        else:
            if app_dir != ".":
                reload_dirs = [str(module_path), app_dir]
            else:
                reload_dirs = [str(module_path)]

            WatchReloader(
                target=_run,
                args=args,
                reload_dirs=reload_dirs,
                extra_extensions=watch_extensions,
            ).run()

    elif workers > 1:
        if isinstance(app_obj, FastStream):
            from faststream.cli.supervisors.multiprocess import Multiprocess

            Multiprocess(
                target=_run,
                args=(*args, logging.DEBUG),
                workers=workers,
            ).run()
        elif isinstance(app_obj, AsgiFastStream):
            from faststream.cli.supervisors.asgi_multiprocess import ASGIMultiprocess

            ASGIMultiprocess(
                target=app,
                args=args,  # type: ignore[arg-type]
                workers=workers,
            ).run()
        else:
            raise typer.BadParameter(
                f"Unexpected app type, expected FastStream or AsgiFastStream, got: {type(app_obj)}."
            )

    else:
        _run_imported_app(
            app_obj,
            extra_options=extra,
            log_level=casted_log_level,
        )