Skip to content

FastStream

faststream.app.FastStream #

FastStream(broker=None, logger=logger, lifespan=None, title='FastStream', version='0.1.0', description='', terms_of_service=None, license=None, contact=None, tags=None, external_docs=None, identifier=None, on_startup=(), after_startup=(), on_shutdown=(), after_shutdown=())

Bases: AsyncAPIApplication

A class representing a FastStream application.

Source code in faststream/app.py
def __init__(
    self,
    broker: Optional["BrokerUsecase[Any, Any]"] = None,
    logger: Optional["LoggerProto"] = logger,
    lifespan: Optional["Lifespan"] = None,
    # AsyncAPI args,
    title: str = "FastStream",
    version: str = "0.1.0",
    description: str = "",
    terms_of_service: Optional["AnyHttpUrl"] = None,
    license: Optional[Union["License", "LicenseDict", "AnyDict"]] = None,
    contact: Optional[Union["Contact", "ContactDict", "AnyDict"]] = None,
    tags: Optional[Sequence[Union["Tag", "TagDict", "AnyDict"]]] = None,
    external_docs: Optional[
        Union["ExternalDocs", "ExternalDocsDict", "AnyDict"]
    ] = None,
    identifier: Optional[str] = None,
    on_startup: Sequence["AnyCallable"] = (),
    after_startup: Sequence["AnyCallable"] = (),
    on_shutdown: Sequence["AnyCallable"] = (),
    after_shutdown: Sequence["AnyCallable"] = (),
    # all options should be copied to AsgiFastStream class
) -> None:
    context.set_global("app", self)

    self.broker = broker
    self.logger = logger
    self.context = context

    self._on_startup_calling = [apply_types(to_async(x)) for x in on_startup]
    self._after_startup_calling = [apply_types(to_async(x)) for x in after_startup]
    self._on_shutdown_calling = [apply_types(to_async(x)) for x in on_shutdown]
    self._after_shutdown_calling = [
        apply_types(to_async(x)) for x in after_shutdown
    ]

    self.lifespan_context = (
        apply_types(
            func=lifespan,
            wrap_model=drop_response_type,
        )
        if lifespan is not None
        else fake_context
    )

    self.should_exit = False

    # AsyncAPI information
    self.title = title
    self.version = version
    self.description = description
    self.terms_of_service = terms_of_service
    self.license = license
    self.contact = contact
    self.identifier = identifier
    self.asyncapi_tags = tags
    self.external_docs = external_docs

broker instance-attribute #

broker = broker

logger instance-attribute #

logger = logger

context instance-attribute #

context = context

lifespan_context instance-attribute #

lifespan_context = apply_types(func=lifespan, wrap_model=drop_response_type) if lifespan is not None else fake_context

should_exit instance-attribute #

should_exit = False

title instance-attribute #

title = title

version instance-attribute #

version = version

description instance-attribute #

description = description

terms_of_service instance-attribute #

terms_of_service = terms_of_service

license instance-attribute #

license = license

contact instance-attribute #

contact = contact

identifier instance-attribute #

identifier = identifier

asyncapi_tags instance-attribute #

asyncapi_tags = tags

external_docs instance-attribute #

external_docs = external_docs

set_broker #

set_broker(broker)

Set already existed App object broker.

Useful then you create/init broker in on_startup hook.

Source code in faststream/app.py
def set_broker(self, broker: "BrokerUsecase[Any, Any]") -> None:
    """Set already existed App object broker.

    Useful then you create/init broker in `on_startup` hook.
    """
    self.broker = broker

on_startup #

on_startup(func)

Add hook running BEFORE broker connected.

This hook also takes an extra CLI options as a kwargs.

Source code in faststream/app.py
def on_startup(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running BEFORE broker connected.

    This hook also takes an extra CLI options as a kwargs.
    """
    self._on_startup_calling.append(apply_types(to_async(func)))
    return func

on_shutdown #

on_shutdown(func)

Add hook running BEFORE broker disconnected.

Source code in faststream/app.py
def on_shutdown(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running BEFORE broker disconnected."""
    self._on_shutdown_calling.append(apply_types(to_async(func)))
    return func

after_startup #

after_startup(func)

Add hook running AFTER broker connected.

Source code in faststream/app.py
def after_startup(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running AFTER broker connected."""
    self._after_startup_calling.append(apply_types(to_async(func)))
    return func

after_shutdown #

after_shutdown(func)

Add hook running AFTER broker disconnected.

Source code in faststream/app.py
def after_shutdown(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running AFTER broker disconnected."""
    self._after_shutdown_calling.append(apply_types(to_async(func)))
    return func

run async #

run(log_level=logging.INFO, run_extra_options=None, sleep_time=0.1)

Run FastStream Application.

Source code in faststream/app.py
async def run(
    self,
    log_level: int = logging.INFO,
    run_extra_options: Optional[Dict[str, "SettingField"]] = None,
    sleep_time: float = 0.1,
) -> None:
    """Run FastStream Application."""
    assert self.broker, "You should setup a broker"  # nosec B101

    set_exit(lambda *_: self.exit(), sync=False)

    async with catch_startup_validation_error(), self.lifespan_context(
        **(run_extra_options or {})
    ):
        try:
            async with anyio.create_task_group() as tg:
                tg.start_soon(self._startup, log_level, run_extra_options)

                # TODO: mv it to event trigger after nats-py fixing
                while not self.should_exit:  # noqa: ASYNC110
                    await anyio.sleep(sleep_time)

                await self._shutdown(log_level)
                tg.cancel_scope.cancel()
        except ExceptionGroup as e:
            for ex in e.exceptions:
                raise ex from None

exit #

exit()

Stop application manually.

Source code in faststream/app.py
def exit(self) -> None:
    """Stop application manually."""
    self.should_exit = True

start async #

start(**run_extra_options)

Executes startup hooks and start broker.

Source code in faststream/app.py
async def start(
    self,
    **run_extra_options: "SettingField",
) -> None:
    """Executes startup hooks and start broker."""
    for func in self._on_startup_calling:
        await func(**run_extra_options)

    if self.broker is not None:
        await self.broker.start()

    for func in self._after_startup_calling:
        await func()

stop async #

stop()

Executes shutdown hooks and stop broker.

Source code in faststream/app.py
async def stop(self) -> None:
    """Executes shutdown hooks and stop broker."""
    for func in self._on_shutdown_calling:
        await func()

    if self.broker is not None:
        await self.broker.close()

    for func in self._after_shutdown_calling:
        await func()