Skip to content

AsgiFastStream

faststream.asgi.app.AsgiFastStream #

AsgiFastStream(broker=None, /, asgi_routes=(), asyncapi_path=None, logger=logger, lifespan=None, title='FastStream', version='0.1.0', description='', terms_of_service=None, license=None, contact=None, tags=None, external_docs=None, identifier=None, on_startup=(), after_startup=(), on_shutdown=(), after_shutdown=())

Bases: Application

Source code in faststream/asgi/app.py
def __init__(
    self,
    broker: Optional["BrokerUsecase[Any, Any]"] = None,
    /,
    asgi_routes: Sequence[Tuple[str, "ASGIApp"]] = (),
    asyncapi_path: Optional[str] = None,
    # regular broker args
    logger: Optional["LoggerProto"] = logger,
    lifespan: Optional["Lifespan"] = None,
    # AsyncAPI args,
    title: str = "FastStream",
    version: str = "0.1.0",
    description: str = "",
    terms_of_service: Optional["AnyHttpUrl"] = None,
    license: Optional[Union["License", "LicenseDict", "AnyDict"]] = None,
    contact: Optional[Union["Contact", "ContactDict", "AnyDict"]] = None,
    tags: Optional[Sequence[Union["Tag", "TagDict", "AnyDict"]]] = None,
    external_docs: Optional[
        Union["ExternalDocs", "ExternalDocsDict", "AnyDict"]
    ] = None,
    identifier: Optional[str] = None,
    on_startup: Sequence["AnyCallable"] = (),
    after_startup: Sequence["AnyCallable"] = (),
    on_shutdown: Sequence["AnyCallable"] = (),
    after_shutdown: Sequence["AnyCallable"] = (),
) -> None:
    super().__init__(
        broker=broker,
        logger=logger,
        lifespan=lifespan,
        title=title,
        version=version,
        description=description,
        terms_of_service=terms_of_service,
        license=license,
        contact=contact,
        tags=tags,
        external_docs=external_docs,
        identifier=identifier,
        on_startup=on_startup,
        after_startup=after_startup,
        on_shutdown=on_shutdown,
        after_shutdown=after_shutdown,
    )

    self.routes = list(asgi_routes)
    if asyncapi_path:
        self.mount(asyncapi_path, make_asyncapi_asgi(self))

broker instance-attribute #

broker = broker

title instance-attribute #

title = title

version instance-attribute #

version = version

description instance-attribute #

description = description

terms_of_service instance-attribute #

terms_of_service = terms_of_service

license instance-attribute #

license = license

contact instance-attribute #

contact = contact

asyncapi_tags instance-attribute #

asyncapi_tags = tags

external_docs instance-attribute #

external_docs = external_docs

identifier instance-attribute #

identifier = identifier

logger instance-attribute #

logger = logger

context instance-attribute #

context = context

lifespan_context instance-attribute #

lifespan_context = apply_types(func=lifespan, wrap_model=drop_response_type)

routes instance-attribute #

routes = list(asgi_routes)

set_broker #

set_broker(broker)

Set already existed App object broker.

Useful then you create/init broker in on_startup hook.

Source code in faststream/_internal/application.py
def set_broker(self, broker: "BrokerUsecase[Any, Any]") -> None:
    """Set already existed App object broker.

    Useful then you create/init broker in `on_startup` hook.
    """
    self.broker = broker

on_startup #

on_startup(func)

Add hook running BEFORE broker connected.

This hook also takes an extra CLI options as a kwargs.

Source code in faststream/_internal/application.py
def on_startup(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running BEFORE broker connected.

    This hook also takes an extra CLI options as a kwargs.
    """
    self._on_startup_calling.append(apply_types(to_async(func)))
    return func

on_shutdown #

on_shutdown(func)

Add hook running BEFORE broker disconnected.

Source code in faststream/_internal/application.py
def on_shutdown(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running BEFORE broker disconnected."""
    self._on_shutdown_calling.append(apply_types(to_async(func)))
    return func

after_startup #

after_startup(func)

Add hook running AFTER broker connected.

Source code in faststream/_internal/application.py
def after_startup(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running AFTER broker connected."""
    self._after_startup_calling.append(apply_types(to_async(func)))
    return func

after_shutdown #

after_shutdown(func)

Add hook running AFTER broker disconnected.

Source code in faststream/_internal/application.py
def after_shutdown(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running AFTER broker disconnected."""
    self._after_shutdown_calling.append(apply_types(to_async(func)))
    return func

exit #

exit()

Stop application manually.

Source code in faststream/_internal/application.py
def exit(self) -> None:
    """Stop application manually."""
    self._should_exit = True

start async #

start(**run_extra_options)

Executes startup hooks and start broker.

Source code in faststream/_internal/application.py
async def start(
    self,
    **run_extra_options: "SettingField",
) -> None:
    """Executes startup hooks and start broker."""
    for func in self._on_startup_calling:
        await func(**run_extra_options)

    if self.broker is not None:
        await self.broker.start()

    for func in self._after_startup_calling:
        await func()

stop async #

stop()

Executes shutdown hooks and stop broker.

Source code in faststream/_internal/application.py
async def stop(self) -> None:
    """Executes shutdown hooks and stop broker."""
    for func in self._on_shutdown_calling:
        await func()

    if self.broker is not None:
        await self.broker.close()

    for func in self._after_shutdown_calling:
        await func()

from_app classmethod #

from_app(app, asgi_routes, asyncapi_path=None)
Source code in faststream/asgi/app.py
@classmethod
def from_app(
    cls,
    app: Application,
    asgi_routes: Sequence[Tuple[str, "ASGIApp"]],
    asyncapi_path: Optional[str] = None,
) -> "AsgiFastStream":
    asgi_app = cls(
        app.broker,
        asgi_routes=asgi_routes,
        asyncapi_path=asyncapi_path,
        logger=app.logger,
        lifespan=None,
        title=app.title,
        version=app.version,
        description=app.description,
        terms_of_service=app.terms_of_service,
        license=app.license,
        contact=app.contact,
        tags=app.asyncapi_tags,
        external_docs=app.external_docs,
        identifier=app.identifier,
    )
    asgi_app.lifespan_context = app.lifespan_context
    asgi_app._on_startup_calling = app._on_startup_calling
    asgi_app._after_startup_calling = app._after_startup_calling
    asgi_app._on_shutdown_calling = app._on_shutdown_calling
    asgi_app._after_shutdown_calling = app._after_shutdown_calling
    return asgi_app

mount #

mount(path, route)
Source code in faststream/asgi/app.py
def mount(self, path: str, route: "ASGIApp") -> None:
    self.routes.append((path, route))

run async #

run(log_level=INFO, run_extra_options=None, sleep_time=0.1)
Source code in faststream/asgi/app.py
async def run(
    self,
    log_level: int = logging.INFO,
    run_extra_options: Optional[Dict[str, "SettingField"]] = None,
    sleep_time: float = 0.1,
) -> None:
    try:
        import uvicorn  # noqa: F401
        from gunicorn.app.base import BaseApplication
    except ImportError as e:
        raise RuntimeError(
            "You need uvicorn and gunicorn to run FastStream ASGI App via CLI. pip install uvicorn gunicorn"
        ) from e

    class ASGIRunner(BaseApplication):  # type: ignore[misc]
        def __init__(self, options: Dict[str, Any], asgi_app: "ASGIApp") -> None:
            self.options = options
            self.asgi_app = asgi_app
            super().__init__()

        def load_config(self) -> None:
            for k, v in self.options.items():
                if k in self.cfg.settings and v is not None:
                    self.cfg.set(k.lower(), v)

        def load(self) -> "ASGIApp":
            return self.asgi_app

    run_extra_options = run_extra_options or {}

    bindings: List[str] = []
    host = run_extra_options.pop("host", None)
    port = run_extra_options.pop("port", None)
    if host is not None and port is not None:
        bindings.append(f"{host}:{port}")
    elif host is not None:
        bindings.append(f"{host}:8000")
    elif port is not None:
        bindings.append(f"127.0.0.1:{port}")

    bind = run_extra_options.get("bind")
    if isinstance(bind, list):
        bindings.extend(bind)  # type: ignore
    elif isinstance(bind, str):
        bindings.append(bind)

    run_extra_options["bind"] = bindings or "127.0.0.1:8000"
    #  We use gunicorn with uvicorn workers because uvicorn don't support multiple workers
    run_extra_options["worker_class"] = "uvicorn.workers.UvicornWorker"

    ASGIRunner(run_extra_options, self).run()

start_lifespan_context async #

start_lifespan_context()
Source code in faststream/asgi/app.py
@asynccontextmanager
async def start_lifespan_context(self) -> AsyncIterator[None]:
    async with anyio.create_task_group() as tg, self.lifespan_context():
        tg.start_soon(self._startup)

        try:
            yield
        finally:
            await self._shutdown()
            tg.cancel_scope.cancel()

lifespan async #

lifespan(scope, receive, send)

Handle ASGI lifespan messages to start and shutdown the app.

Source code in faststream/asgi/app.py
async def lifespan(self, scope: "Scope", receive: "Receive", send: "Send") -> None:
    """Handle ASGI lifespan messages to start and shutdown the app."""
    started = False
    await receive()  # handle `lifespan.startup` event

    try:
        async with self.start_lifespan_context():
            await send({"type": "lifespan.startup.complete"})
            started = True
            await receive()  # handle `lifespan.shutdown` event

    except BaseException:
        exc_text = traceback.format_exc()
        if started:
            await send({"type": "lifespan.shutdown.failed", "message": exc_text})
        else:
            await send({"type": "lifespan.startup.failed", "message": exc_text})
        raise

    else:
        await send({"type": "lifespan.shutdown.complete"})

not_found async #

not_found(scope, receive, send)
Source code in faststream/asgi/app.py
async def not_found(self, scope: "Scope", receive: "Receive", send: "Send") -> None:
    not_found_msg = "App doesn't support regular HTTP protocol."

    if scope["type"] == "websocket":
        websocket_close = WebSocketClose(
            code=1000,
            reason=not_found_msg,
        )
        await websocket_close(scope, receive, send)
        return

    response = AsgiResponse(
        body=not_found_msg.encode(),
        status_code=404,
    )

    await response(scope, receive, send)