Skip to content

AsgiFastStream

faststream.asgi.AsgiFastStream #

AsgiFastStream(broker=None, /, asgi_routes=(), asyncapi_path=None, logger=logger, lifespan=None, title='FastStream', version='0.1.0', description='', terms_of_service=None, license=None, contact=None, tags=None, external_docs=None, identifier=None, on_startup=(), after_startup=(), on_shutdown=(), after_shutdown=())

Bases: FastStream

Source code in faststream/asgi/app.py
def __init__(
    self,
    broker: Optional["BrokerUsecase[Any, Any]"] = None,
    /,
    asgi_routes: Sequence[Tuple[str, "ASGIApp"]] = (),
    asyncapi_path: Optional[str] = None,
    # regular broker args
    logger: Optional["LoggerProto"] = logger,
    lifespan: Optional["Lifespan"] = None,
    # AsyncAPI args,
    title: str = "FastStream",
    version: str = "0.1.0",
    description: str = "",
    terms_of_service: Optional["AnyHttpUrl"] = None,
    license: Optional[Union["License", "LicenseDict", "AnyDict"]] = None,
    contact: Optional[Union["Contact", "ContactDict", "AnyDict"]] = None,
    tags: Optional[Sequence[Union["Tag", "TagDict", "AnyDict"]]] = None,
    external_docs: Optional[
        Union["ExternalDocs", "ExternalDocsDict", "AnyDict"]
    ] = None,
    identifier: Optional[str] = None,
    on_startup: Sequence["AnyCallable"] = (),
    after_startup: Sequence["AnyCallable"] = (),
    on_shutdown: Sequence["AnyCallable"] = (),
    after_shutdown: Sequence["AnyCallable"] = (),
) -> None:
    super().__init__(
        broker=broker,
        logger=logger,
        lifespan=lifespan,
        title=title,
        version=version,
        description=description,
        terms_of_service=terms_of_service,
        license=license,
        contact=contact,
        tags=tags,
        external_docs=external_docs,
        identifier=identifier,
        on_startup=on_startup,
        after_startup=after_startup,
        on_shutdown=on_shutdown,
        after_shutdown=after_shutdown,
    )

    self.routes = list(asgi_routes)
    if asyncapi_path:
        self.mount(asyncapi_path, make_asyncapi_asgi(self))

broker instance-attribute #

broker = broker

title instance-attribute #

title = title

version instance-attribute #

version = version

description instance-attribute #

description = description

terms_of_service instance-attribute #

terms_of_service = terms_of_service

license instance-attribute #

license = license

contact instance-attribute #

contact = contact

asyncapi_tags instance-attribute #

asyncapi_tags = tags

external_docs instance-attribute #

external_docs = external_docs

identifier instance-attribute #

identifier = identifier

logger instance-attribute #

logger = logger

context instance-attribute #

context = context

lifespan_context instance-attribute #

lifespan_context = apply_types(func=lifespan, wrap_model=drop_response_type) if lifespan is not None else fake_context

should_exit instance-attribute #

should_exit = False

routes instance-attribute #

routes = list(asgi_routes)

set_broker #

set_broker(broker)

Set already existed App object broker.

Useful then you create/init broker in on_startup hook.

Source code in faststream/app.py
def set_broker(self, broker: "BrokerUsecase[Any, Any]") -> None:
    """Set already existed App object broker.

    Useful then you create/init broker in `on_startup` hook.
    """
    self.broker = broker

on_startup #

on_startup(func)

Add hook running BEFORE broker connected.

This hook also takes an extra CLI options as a kwargs.

Source code in faststream/app.py
def on_startup(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running BEFORE broker connected.

    This hook also takes an extra CLI options as a kwargs.
    """
    self._on_startup_calling.append(apply_types(to_async(func)))
    return func

on_shutdown #

on_shutdown(func)

Add hook running BEFORE broker disconnected.

Source code in faststream/app.py
def on_shutdown(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running BEFORE broker disconnected."""
    self._on_shutdown_calling.append(apply_types(to_async(func)))
    return func

after_startup #

after_startup(func)

Add hook running AFTER broker connected.

Source code in faststream/app.py
def after_startup(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running AFTER broker connected."""
    self._after_startup_calling.append(apply_types(to_async(func)))
    return func

after_shutdown #

after_shutdown(func)

Add hook running AFTER broker disconnected.

Source code in faststream/app.py
def after_shutdown(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running AFTER broker disconnected."""
    self._after_shutdown_calling.append(apply_types(to_async(func)))
    return func

run async #

run(log_level=logging.INFO, run_extra_options=None, sleep_time=0.1)

Run FastStream Application.

Source code in faststream/app.py
async def run(
    self,
    log_level: int = logging.INFO,
    run_extra_options: Optional[Dict[str, "SettingField"]] = None,
    sleep_time: float = 0.1,
) -> None:
    """Run FastStream Application."""
    assert self.broker, "You should setup a broker"  # nosec B101

    set_exit(lambda *_: self.exit(), sync=False)

    async with catch_startup_validation_error(), self.lifespan_context(
        **(run_extra_options or {})
    ):
        try:
            async with anyio.create_task_group() as tg:
                tg.start_soon(self._startup, log_level, run_extra_options)

                # TODO: mv it to event trigger after nats-py fixing
                while not self.should_exit:  # noqa: ASYNC110
                    await anyio.sleep(sleep_time)

                await self._shutdown(log_level)
                tg.cancel_scope.cancel()
        except ExceptionGroup as e:
            for ex in e.exceptions:
                raise ex from None

exit #

exit()

Stop application manually.

Source code in faststream/app.py
def exit(self) -> None:
    """Stop application manually."""
    self.should_exit = True

start async #

start(**run_extra_options)

Executes startup hooks and start broker.

Source code in faststream/app.py
async def start(
    self,
    **run_extra_options: "SettingField",
) -> None:
    """Executes startup hooks and start broker."""
    for func in self._on_startup_calling:
        await func(**run_extra_options)

    if self.broker is not None:
        await self.broker.start()

    for func in self._after_startup_calling:
        await func()

stop async #

stop()

Executes shutdown hooks and stop broker.

Source code in faststream/app.py
async def stop(self) -> None:
    """Executes shutdown hooks and stop broker."""
    for func in self._on_shutdown_calling:
        await func()

    if self.broker is not None:
        await self.broker.close()

    for func in self._after_shutdown_calling:
        await func()

mount #

mount(path, route)
Source code in faststream/asgi/app.py
def mount(self, path: str, route: "ASGIApp") -> None:
    self.routes.append((path, route))

start_lifespan_context async #

start_lifespan_context()
Source code in faststream/asgi/app.py
@asynccontextmanager
async def start_lifespan_context(self) -> AsyncIterator[None]:
    async with anyio.create_task_group() as tg, self.lifespan_context():
        tg.start_soon(self._startup)

        try:
            yield
        finally:
            await self._shutdown()
            tg.cancel_scope.cancel()

lifespan async #

lifespan(scope, receive, send)

Handle ASGI lifespan messages to start and shutdown the app.

Source code in faststream/asgi/app.py
async def lifespan(self, scope: "Scope", receive: "Receive", send: "Send") -> None:
    """Handle ASGI lifespan messages to start and shutdown the app."""
    started = False
    await receive()  # handle `lifespan.startup` event

    try:
        async with self.start_lifespan_context():
            await send({"type": "lifespan.startup.complete"})
            started = True
            await receive()  # handle `lifespan.shutdown` event

    except BaseException:
        exc_text = traceback.format_exc()
        if started:
            await send({"type": "lifespan.shutdown.failed", "message": exc_text})
        else:
            await send({"type": "lifespan.startup.failed", "message": exc_text})
        raise

    else:
        await send({"type": "lifespan.shutdown.complete"})

not_found async #

not_found(scope, receive, send)
Source code in faststream/asgi/app.py
async def not_found(self, scope: "Scope", receive: "Receive", send: "Send") -> None:
    not_found_msg = "FastStream doesn't support regular HTTP protocol."

    if scope["type"] == "websocket":
        websocket_close = WebSocketClose(
            code=1000,
            reason=not_found_msg,
        )
        await websocket_close(scope, receive, send)
        return

    response = AsgiResponse(
        body=not_found_msg.encode(),
        status_code=404,
    )

    await response(scope, receive, send)