Skip to content

FastStream

faststream.FastStream #

FastStream(broker: Optional[BrokerAsyncUsecase[Any, Any]] = None, logger: Optional[Logger] = logger, lifespan: Optional[Lifespan] = None, title: str = 'FastStream', version: str = '0.1.0', description: str = '', terms_of_service: Optional[AnyHttpUrl] = None, license: Optional[Union[License, LicenseDict, AnyDict]] = None, contact: Optional[Union[Contact, ContactDict, AnyDict]] = None, identifier: Optional[str] = None, tags: Optional[Sequence[Union[Tag, TagDict, AnyDict]]] = None, external_docs: Optional[Union[ExternalDocs, ExternalDocsDict, AnyDict]] = None)

Bases: ABCApp

A class representing a FastStream application.

METHOD DESCRIPTION
__init__

initializes the FastStream application

on_startup

adds a hook to run before the broker is connected

on_shutdown

adds a hook to run before the broker is disconnected

after_startup

adds a hook to run after the broker is connected

after_shutdown

adds a hook to run after the broker is disconnected

run

runs the FastStream application

_start

starts the FastStream application

_stop

stops the FastStream application

_startup

runs the startup hooks

_shutdown

runs the shutdown hooks

__exit

exits the FastStream application

Asynchronous FastStream Application class.

stores and run broker, control hooks

PARAMETER DESCRIPTION
broker

async broker to run (may be None, then specify by set_broker)

TYPE: Optional[BrokerAsyncUsecase[Any, Any]] DEFAULT: None

logger

logger object to log startup/shutdown messages (None to disable)

TYPE: Optional[Logger] DEFAULT: logger

lifespan

lifespan context to run application

TYPE: Optional[Lifespan] DEFAULT: None

title

application title - for AsyncAPI docs

TYPE: str DEFAULT: 'FastStream'

version

application version - for AsyncAPI docs

TYPE: str DEFAULT: '0.1.0'

description

application description - for AsyncAPI docs

TYPE: str DEFAULT: ''

terms_of_service

application terms of service - for AsyncAPI docs

TYPE: Optional[AnyHttpUrl] DEFAULT: None

license

application license - for AsyncAPI docs

TYPE: Optional[Union[License, LicenseDict, AnyDict]] DEFAULT: None

contact

application contact - for AsyncAPI docs

TYPE: Optional[Union[Contact, ContactDict, AnyDict]] DEFAULT: None

identifier

application identifier - for AsyncAPI docs

TYPE: Optional[str] DEFAULT: None

tags

application tags - for AsyncAPI docs

TYPE: Optional[Sequence[Union[Tag, TagDict, AnyDict]]] DEFAULT: None

external_docs

application external docs - for AsyncAPI docs

TYPE: Optional[Union[ExternalDocs, ExternalDocsDict, AnyDict]] DEFAULT: None

Source code in faststream/app.py
def __init__(
    self,
    broker: Optional[BrokerAsyncUsecase[Any, Any]] = None,
    logger: Optional[logging.Logger] = logger,
    lifespan: Optional[Lifespan] = None,
    # AsyncAPI args,
    title: str = "FastStream",
    version: str = "0.1.0",
    description: str = "",
    terms_of_service: Optional[AnyHttpUrl] = None,
    license: Optional[Union[License, LicenseDict, AnyDict]] = None,
    contact: Optional[Union[Contact, ContactDict, AnyDict]] = None,
    identifier: Optional[str] = None,
    tags: Optional[Sequence[Union[Tag, TagDict, AnyDict]]] = None,
    external_docs: Optional[Union[ExternalDocs, ExternalDocsDict, AnyDict]] = None,
) -> None:
    """Asynchronous FastStream Application class.

    stores and run broker, control hooks

    Args:
        broker: async broker to run (may be `None`, then specify by `set_broker`)
        logger: logger object to log startup/shutdown messages (`None` to disable)
        lifespan: lifespan context to run application
        title: application title - for AsyncAPI docs
        version: application version - for AsyncAPI docs
        description: application description - for AsyncAPI docs
        terms_of_service: application terms of service - for AsyncAPI docs
        license: application license - for AsyncAPI docs
        contact: application contact - for AsyncAPI docs
        identifier: application identifier - for AsyncAPI docs
        tags: application tags - for AsyncAPI docs
        external_docs: application external docs - for AsyncAPI docs
    """
    super().__init__(
        broker=broker,
        logger=logger,
        title=title,
        version=version,
        description=description,
        terms_of_service=terms_of_service,
        license=license,
        contact=contact,
        identifier=identifier,
        tags=tags,
        external_docs=external_docs,
    )

    self.lifespan_context = (
        apply_types(
            func=lifespan,
            wrap_model=drop_response_type,
        )
        if lifespan is not None
        else fake_context
    )

asyncapi_tags instance-attribute #

asyncapi_tags = tags

broker instance-attribute #

broker = broker

contact instance-attribute #

contact = contact

context instance-attribute #

context = context

description instance-attribute #

description = description

external_docs instance-attribute #

external_docs = external_docs

identifier instance-attribute #

identifier = identifier

license instance-attribute #

license = license

lifespan_context instance-attribute #

lifespan_context = apply_types(func=lifespan, wrap_model=drop_response_type) if lifespan is not None else fake_context

logger instance-attribute #

logger = logger

terms_of_service instance-attribute #

terms_of_service = terms_of_service

title instance-attribute #

title = title

version instance-attribute #

version = version

after_shutdown #

after_shutdown(func: Callable[P_HookParams, T_HookReturn]) -> Callable[P_HookParams, T_HookReturn]

Add hook running AFTER broker disconnected.

PARAMETER DESCRIPTION
func

async or sync func to call as a hook

TYPE: Callable[P_HookParams, T_HookReturn]

RETURNS DESCRIPTION
Callable[P_HookParams, T_HookReturn]

Async version of the func argument

Source code in faststream/app.py
def after_shutdown(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running AFTER broker disconnected.

    Args:
        func: async or sync func to call as a hook

    Returns:
        Async version of the func argument
    """
    super().after_shutdown(to_async(func))
    return func

after_startup #

after_startup(func: Callable[P_HookParams, T_HookReturn]) -> Callable[P_HookParams, T_HookReturn]

Add hook running AFTER broker connected.

PARAMETER DESCRIPTION
func

async or sync func to call as a hook

TYPE: Callable[P_HookParams, T_HookReturn]

RETURNS DESCRIPTION
Callable[P_HookParams, T_HookReturn]

Async version of the func argument

Source code in faststream/app.py
def after_startup(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running AFTER broker connected.

    Args:
        func: async or sync func to call as a hook

    Returns:
        Async version of the func argument
    """
    super().after_startup(to_async(func))
    return func

on_shutdown #

on_shutdown(func: Callable[P_HookParams, T_HookReturn]) -> Callable[P_HookParams, T_HookReturn]

Add hook running BEFORE broker disconnected.

PARAMETER DESCRIPTION
func

async or sync func to call as a hook

TYPE: Callable[P_HookParams, T_HookReturn]

RETURNS DESCRIPTION
Callable[P_HookParams, T_HookReturn]

Async version of the func argument

Source code in faststream/app.py
def on_shutdown(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running BEFORE broker disconnected.

    Args:
        func: async or sync func to call as a hook

    Returns:
        Async version of the func argument
    """
    super().on_shutdown(to_async(func))
    return func

on_startup #

on_startup(func: Callable[P_HookParams, T_HookReturn]) -> Callable[P_HookParams, T_HookReturn]

Add hook running BEFORE broker connected.

This hook also takes an extra CLI options as a kwargs

PARAMETER DESCRIPTION
func

async or sync func to call as a hook

TYPE: Callable[P_HookParams, T_HookReturn]

RETURNS DESCRIPTION
Callable[P_HookParams, T_HookReturn]

Async version of the func argument

Source code in faststream/app.py
def on_startup(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running BEFORE broker connected.

    This hook also takes an extra CLI options as a kwargs

    Args:
        func: async or sync func to call as a hook

    Returns:
        Async version of the func argument
    """
    super().on_startup(to_async(func))
    return func

run async #

run(log_level: int = logging.INFO, run_extra_options: Optional[Dict[str, SettingField]] = None) -> None

Run FastStream Application.

PARAMETER DESCRIPTION
log_level

force application log level

TYPE: int DEFAULT: INFO

run_extra_options

extra options for running the app

TYPE: Optional[Dict[str, SettingField]] DEFAULT: None

RETURNS DESCRIPTION
None

Block an event loop until stopped

Source code in faststream/app.py
async def run(
    self,
    log_level: int = logging.INFO,
    run_extra_options: Optional[Dict[str, SettingField]] = None,
) -> None:
    """Run FastStream Application.

    Args:
        log_level: force application log level
        run_extra_options: extra options for running the app

    Returns:
        Block an event loop until stopped
    """
    assert self.broker, "You should setup a broker"  # nosec B101

    async with self.lifespan_context(**(run_extra_options or {})):
        try:
            async with anyio.create_task_group() as tg:
                tg.start_soon(self._start, log_level, run_extra_options)
                await self._stop(log_level)
                tg.cancel_scope.cancel()
        except ExceptionGroup as e:
            for ex in e.exceptions:
                raise ex from None

set_broker #

set_broker(broker: BrokerAsyncUsecase[Any, Any]) -> None

Set already existed App object broker.

Useful then you create/init broker in on_startup hook.

Source code in faststream/app.py
def set_broker(self, broker: BrokerAsyncUsecase[Any, Any]) -> None:
    """Set already existed App object broker.

    Useful then you create/init broker in `on_startup` hook.
    """
    self.broker = broker