ABCApp
faststream.app.ABCApp #
ABCApp(broker: Optional[BrokerAsyncUsecase[Any, Any]] = None, logger: Optional[Logger] = logger, title: str = 'FastStream', version: str = '0.1.0', description: str = '', terms_of_service: Optional[AnyHttpUrl] = None, license: Optional[Union[License, LicenseDict, AnyDict]] = None, contact: Optional[Union[Contact, ContactDict, AnyDict]] = None, identifier: Optional[str] = None, tags: Optional[Sequence[Union[Tag, TagDict, AnyDict]]] = None, external_docs: Optional[Union[ExternalDocs, ExternalDocsDict, AnyDict]] = None)
Bases: ABC
A class representing an ABC App.
METHOD | DESCRIPTION |
---|---|
set_broker | Set the broker object |
on_startup | Add a hook to be run before the broker is connected |
on_shutdown | Add a hook to be run before the broker is disconnected |
after_startup | Add a hook to be run after the broker is connected |
after_shutdown | Add a hook to be run after the broker is disconnected |
_log | Log a message at a specified |
Initialize an instance of the class.
PARAMETER | DESCRIPTION |
---|---|
broker | An optional instance of the BrokerAsyncUsecase class. TYPE: |
logger | An optional instance of the logging.Logger class. |
title | A string representing the title of the AsyncAPI. TYPE: |
version | A string representing the version of the AsyncAPI. TYPE: |
description | A string representing the description of the AsyncAPI. TYPE: |
terms_of_service | An optional URL representing the terms of service of the AsyncAPI. TYPE: |
license | An optional instance of the License class. TYPE: |
contact | An optional instance of the Contact class. TYPE: |
identifier | An optional string representing the identifier of the AsyncAPI. |
tags | An optional sequence of Tag instances. TYPE: |
external_docs | An optional instance of the ExternalDocs class. TYPE: |
Source code in faststream/app.py
after_shutdown #
Add hook running AFTER broker disconnected.
after_startup #
Add hook running AFTER broker connected.
on_shutdown #
Add hook running BEFORE broker disconnected.
on_startup #
Add hook running BEFORE broker connected.
This hook also takes an extra CLI options as a kwargs.
Source code in faststream/app.py
set_broker #
set_broker(broker: BrokerAsyncUsecase[Any, Any]) -> None
Set already existed App object broker.
Useful then you create/init broker in on_startup
hook.