Skip to content

import_from_string

faststream.cli.utils.imports.import_from_string #

import_from_string(import_str)

Import FastStream application from module specified by a string.

Source code in faststream/cli/utils/imports.py
def import_from_string(import_str: str) -> Tuple[Path, "FastStream"]:
    """Import FastStream application from module specified by a string."""
    if not isinstance(import_str, str):
        raise typer.BadParameter("Given value is not of type string")

    module_str, _, attrs_str = import_str.partition(":")
    if not module_str or not attrs_str:
        raise typer.BadParameter(
            f'Import string "{import_str}" must be in format "<module>:<attribute>"'
        )

    try:
        module = importlib.import_module(  # nosemgrep: python.lang.security.audit.non-literal-import.non-literal-import
            module_str
        )

    except ModuleNotFoundError:
        module_path, app_name = get_app_path(import_str)
        instance = try_import_app(module_path, app_name)

    else:
        attr = module
        try:
            for attr_str in attrs_str.split("."):
                attr = getattr(attr, attr_str)
            instance = attr  # type: ignore[assignment]

        except AttributeError as e:
            typer.echo(e, err=True)
            raise typer.BadParameter(
                f'Attribute "{attrs_str}" not found in module "{module_str}".'
            ) from e

        if module.__file__:
            module_path = Path(module.__file__).resolve().parent
        else:
            module_path = Path.cwd()

    return module_path, instance