Skip to content

FastStream

faststream.FastStream #

FastStream(
    broker: Optional[BrokerAsyncUsecase[Any, Any]] = None,
    logger: Optional[logging.Logger] = logger,
    title: str = "FastStream",
    version: str = "0.1.0",
    description: str = "",
    terms_of_service: Optional[AnyHttpUrl] = None,
    license: Optional[
        Union[License, LicenseDict, AnyDict]
    ] = None,
    contact: Optional[
        Union[Contact, ContactDict, AnyDict]
    ] = None,
    identifier: Optional[str] = None,
    tags: Optional[
        Sequence[Union[Tag, TagDict, AnyDict]]
    ] = None,
    external_docs: Optional[
        Union[ExternalDocs, ExternalDocsDict, AnyDict]
    ] = None,
)

Bases: ABCApp

A class representing a FastStream application.

METHOD DESCRIPTION
__init__

initializes the FastStream application

on_startup

adds a hook to run before the broker is connected

on_shutdown

adds a hook to run before the broker is disconnected

after_startup

adds a hook to run after the broker is connected

after_shutdown

adds a hook to run after the broker is disconnected

run

runs the FastStream application

_init_async_cycle

initializes the async cycle

_start

starts the FastStream application

_stop

stops the FastStream application

_startup

runs the startup hooks

_shutdown

runs the shutdown hooks

__exit

exits the FastStream application

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Asyncronous FastStream Application class

stores and run broker, control hooks

PARAMETER DESCRIPTION
broker

async broker to run (may be None, then specify by set_broker)

TYPE: Optional[BrokerAsyncUsecase[Any, Any]] DEFAULT: None

logger

logger object to log startup/shutdown messages (None to disable)

TYPE: Optional[Logger] DEFAULT: logger

title

application title - for AsyncAPI docs

TYPE: str DEFAULT: 'FastStream'

version

application version - for AsyncAPI docs

TYPE: str DEFAULT: '0.1.0'

description

application description - for AsyncAPI docs

TYPE: str DEFAULT: ''

Source code in faststream/app.py
def __init__(
    self,
    broker: Optional[BrokerAsyncUsecase[Any, Any]] = None,
    logger: Optional[logging.Logger] = logger,
    # AsyncAPI args,
    title: str = "FastStream",
    version: str = "0.1.0",
    description: str = "",
    terms_of_service: Optional[AnyHttpUrl] = None,
    license: Optional[Union[License, LicenseDict, AnyDict]] = None,
    contact: Optional[Union[Contact, ContactDict, AnyDict]] = None,
    identifier: Optional[str] = None,
    tags: Optional[Sequence[Union[Tag, TagDict, AnyDict]]] = None,
    external_docs: Optional[Union[ExternalDocs, ExternalDocsDict, AnyDict]] = None,
):
    """Asyncronous FastStream Application class

    stores and run broker, control hooks

    Args:
        broker: async broker to run (may be `None`, then specify by `set_broker`)
        logger: logger object to log startup/shutdown messages (`None` to disable)
        title: application title - for AsyncAPI docs
        version: application version - for AsyncAPI docs
        description: application description - for AsyncAPI docs
    """
    super().__init__(
        broker=broker,
        logger=logger,
        title=title,
        version=version,
        description=description,
        terms_of_service=terms_of_service,
        license=license,
        contact=contact,
        identifier=identifier,
        tags=tags,
        external_docs=external_docs,
    )

    self._stop_event = None

    set_exit(lambda *_: self.__exit())

asyncapi_tags instance-attribute #

asyncapi_tags = tags

broker instance-attribute #

broker = broker

contact instance-attribute #

contact = contact

context instance-attribute #

context = context

description instance-attribute #

description = description

external_docs instance-attribute #

external_docs = external_docs

identifier instance-attribute #

identifier = identifier

license instance-attribute #

license = license

logger instance-attribute #

logger = logger

terms_of_service instance-attribute #

terms_of_service = terms_of_service

title instance-attribute #

title = title

version instance-attribute #

version = version

after_shutdown #

after_shutdown(
    func: Callable[P_HookParams, T_HookReturn]
) -> Callable[P_HookParams, T_HookReturn]

Add hook running AFTER broker disconnected

PARAMETER DESCRIPTION
func

async or sync func to call as a hook

TYPE: Callable[P_HookParams, T_HookReturn]

RETURNS DESCRIPTION
Callable[P_HookParams, T_HookReturn]

Async version of the func argument

Source code in faststream/app.py
def after_shutdown(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running AFTER broker disconnected

    Args:
        func: async or sync func to call as a hook

    Returns:
        Async version of the func argument
    """
    super().after_shutdown(to_async(func))
    return func

after_startup #

after_startup(
    func: Callable[P_HookParams, T_HookReturn]
) -> Callable[P_HookParams, T_HookReturn]

Add hook running AFTER broker connected

PARAMETER DESCRIPTION
func

async or sync func to call as a hook

TYPE: Callable[P_HookParams, T_HookReturn]

RETURNS DESCRIPTION
Callable[P_HookParams, T_HookReturn]

Async version of the func argument

Source code in faststream/app.py
def after_startup(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running AFTER broker connected

    Args:
        func: async or sync func to call as a hook

    Returns:
        Async version of the func argument
    """
    super().after_startup(to_async(func))
    return func

on_shutdown #

on_shutdown(
    func: Callable[P_HookParams, T_HookReturn]
) -> Callable[P_HookParams, T_HookReturn]

Add hook running BEFORE broker disconnected

PARAMETER DESCRIPTION
func

async or sync func to call as a hook

TYPE: Callable[P_HookParams, T_HookReturn]

RETURNS DESCRIPTION
Callable[P_HookParams, T_HookReturn]

Async version of the func argument

Source code in faststream/app.py
def on_shutdown(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running BEFORE broker disconnected

    Args:
        func: async or sync func to call as a hook

    Returns:
        Async version of the func argument
    """
    super().on_shutdown(to_async(func))
    return func

on_startup #

on_startup(
    func: Callable[P_HookParams, T_HookReturn]
) -> Callable[P_HookParams, T_HookReturn]

Add hook running BEFORE broker connected

This hook also takes an extra CLI options as a kwargs

PARAMETER DESCRIPTION
func

async or sync func to call as a hook

TYPE: Callable[P_HookParams, T_HookReturn]

RETURNS DESCRIPTION
Callable[P_HookParams, T_HookReturn]

Async version of the func argument

Source code in faststream/app.py
def on_startup(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running BEFORE broker connected

    This hook also takes an extra CLI options as a kwargs

    Args:
        func: async or sync func to call as a hook

    Returns:
        Async version of the func argument
    """
    super().on_startup(to_async(func))
    return func

run async #

run(
    log_level: int = logging.INFO,
    run_extra_options: Optional[
        Dict[str, SettingField]
    ] = None,
) -> None

Run FastStream Application

PARAMETER DESCRIPTION
log_level

force application log level

TYPE: int DEFAULT: INFO

RETURNS DESCRIPTION
None

Block an event loop until stopped

Source code in faststream/app.py
async def run(
    self,
    log_level: int = logging.INFO,
    run_extra_options: Optional[Dict[str, SettingField]] = None,
) -> None:
    """Run FastStream Application

    Args:
        log_level: force application log level

    Returns:
        Block an event loop until stopped
    """
    assert self.broker, "You should setup a broker"  # nosec B101

    self._init_async_cycle()
    async with anyio.create_task_group() as tg:
        tg.start_soon(self._start, log_level, run_extra_options)
        await self._stop(log_level)
        tg.cancel_scope.cancel()

set_broker #

set_broker(broker: BrokerAsyncUsecase[Any, Any]) -> None

Set already existed App object broker Usefull then you create/init broker in on_startup hook

Source code in faststream/app.py
def set_broker(self, broker: BrokerAsyncUsecase[Any, Any]) -> None:
    """Set already existed App object broker
    Usefull then you create/init broker in `on_startup` hook"""
    self.broker = broker

Last update: 2023-11-13