ABCApp
faststream.app.ABCApp #
ABCApp(
broker: Optional[BrokerAsyncUsecase[Any, Any]] = None,
logger: Optional[Logger] = logger,
title: str = "FastStream",
version: str = "0.1.0",
description: str = "",
terms_of_service: Optional[AnyHttpUrl] = None,
license: Optional[
Union[License, LicenseDict, AnyDict]
] = None,
contact: Optional[
Union[Contact, ContactDict, AnyDict]
] = None,
identifier: Optional[str] = None,
tags: Optional[
Sequence[Union[Tag, TagDict, AnyDict]]
] = None,
external_docs: Optional[
Union[ExternalDocs, ExternalDocsDict, AnyDict]
] = None,
)
Bases: ABC
A class representing an ABC App.
METHOD | DESCRIPTION |
---|---|
set_broker | Set the broker object |
on_startup | Add a hook to be run before the broker is connected |
on_shutdown | Add a hook to be run before the broker is disconnected |
after_startup | Add a hook to be run after the broker is connected |
after_shutdown | Add a hook to be run after the broker is disconnected |
_log | Log a message at a specified |
Initialize an instance of the class.
PARAMETER | DESCRIPTION |
---|---|
broker | An optional instance of the BrokerAsyncUsecase class. TYPE: |
logger | An optional instance of the logging.Logger class. |
title | A string representing the title of the AsyncAPI. TYPE: |
version | A string representing the version of the AsyncAPI. TYPE: |
description | A string representing the description of the AsyncAPI. TYPE: |
terms_of_service | An optional URL representing the terms of service of the AsyncAPI. TYPE: |
license | An optional instance of the License class. TYPE: |
contact | An optional instance of the Contact class. TYPE: |
identifier | An optional string representing the identifier of the AsyncAPI. |
tags | An optional sequence of Tag instances. TYPE: |
external_docs | An optional instance of the ExternalDocs class. TYPE: |
Source code in faststream/app.py
after_shutdown #
after_shutdown(
func: Callable[P_HookParams, T_HookReturn]
) -> Callable[P_HookParams, T_HookReturn]
Add hook running AFTER broker disconnected.
after_startup #
Add hook running AFTER broker connected.
on_shutdown #
Add hook running BEFORE broker disconnected.
on_startup #
Add hook running BEFORE broker connected.
This hook also takes an extra CLI options as a kwargs.
Source code in faststream/app.py
set_broker #
set_broker(broker: BrokerAsyncUsecase[Any, Any]) -> None
Set already existed App object broker.
Useful then you create/init broker in on_startup
hook.