BrokerAsyncUsecase
faststream.broker.core.asynchronous.BrokerAsyncUsecase #
BrokerAsyncUsecase(*args: Any, apply_types: bool = True, validate: bool = True, logger: Optional[Logger] = access_logger, log_level: int = logging.INFO, log_fmt: Optional[str] = '%(asctime)s %(levelname)s - %(message)s', dependencies: Sequence[Depends] = (), decoder: Optional[CustomDecoder[StreamMessage[MsgType]]] = None, parser: Optional[CustomParser[MsgType, StreamMessage[MsgType]]] = None, middlewares: Optional[Sequence[Callable[[MsgType], BaseMiddleware]]] = None, graceful_timeout: Optional[float] = None, **kwargs: Any)
Bases: BrokerUsecase[MsgType, ConnectionType]
A class representing a broker async use case.
METHOD | DESCRIPTION |
---|---|
start | Abstract method to start the broker async use case. |
_connect | Any) : Abstract method to connect to the broker. |
_close | Optional[Type[BaseException]] = None, exc_val: Optional[BaseException] = None, exec_tb: Optional[TracebackType] = None) : Abstract method to close the connection to the broker. |
close | Optional[Type[BaseException]] = None, exc_val: Optional[BaseException] = None, exec_tb: Optional[TracebackType] = None) : Close the connection to the broker. |
_process_message | Callable[[StreamMessage[MsgType]], Awaitable[T_HandlerReturn]], watcher: BaseWatcher) : Abstract method to process a message. |
publish | SendableMessage, *args: Any, reply_to: str = "", rpc: bool = False, rpc_timeout: Optional[float] |
Initialize the class.
PARAMETER | DESCRIPTION |
---|---|
*args | Variable length arguments TYPE: |
apply_types | Whether to apply types or not TYPE: |
validate | Whether to cast types using Pydantic validation. TYPE: |
logger | Logger object for logging |
log_level | Log level for logging |
log_fmt | Log format for logging TYPE: |
dependencies | Sequence of dependencies TYPE: |
decoder | Custom decoder object TYPE: |
parser | Custom parser object TYPE: |
middlewares | Sequence of middlewares TYPE: |
graceful_timeout | Graceful timeout |
**kwargs | Keyword arguments TYPE: |
Source code in faststream/broker/core/asynchronous.py
close async
#
close(exc_type: Optional[Type[BaseException]] = None, exc_val: Optional[BaseException] = None, exec_tb: Optional[TracebackType] = None) -> None
Closes the object.
PARAMETER | DESCRIPTION |
---|---|
exc_type | The type of the exception being handled, if any. TYPE: |
exc_val | The exception instance being handled, if any. TYPE: |
exec_tb | The traceback of the exception being handled, if any. TYPE: |
RETURNS | DESCRIPTION |
---|---|
None | None |
RAISES | DESCRIPTION |
---|---|
NotImplementedError | If the method is not implemented. |
Source code in faststream/broker/core/asynchronous.py
connect async
#
Connect to a remote server.
PARAMETER | DESCRIPTION |
---|---|
*args | Variable length argument list. TYPE: |
**kwargs | Arbitrary keyword arguments. TYPE: |
RETURNS | DESCRIPTION |
---|---|
ConnectionType | The connection object. |
Source code in faststream/broker/core/asynchronous.py
include_router #
include_router(router: BrokerRouter[Any, MsgType]) -> None
Includes a router in the current object.
PARAMETER | DESCRIPTION |
---|---|
router | The router to be included. TYPE: |
RETURNS | DESCRIPTION |
---|---|
None | None |
Source code in faststream/broker/core/abc.py
include_routers #
include_routers(*routers: BrokerRouter[Any, MsgType]) -> None
Includes routers in the current object.
PARAMETER | DESCRIPTION |
---|---|
*routers | Variable length argument list of routers to include. TYPE: |
RETURNS | DESCRIPTION |
---|---|
None | None |
Source code in faststream/broker/core/abc.py
publish abstractmethod
async
#
publish(message: SendableMessage, *args: Any, reply_to: str = '', rpc: bool = False, rpc_timeout: Optional[float] = None, raise_timeout: bool = False, **kwargs: Any) -> Optional[SendableMessage]
Publish a message.
PARAMETER | DESCRIPTION |
---|---|
message | The message to be published. TYPE: |
*args | Additional arguments. TYPE: |
reply_to | The reply-to address for the message. TYPE: |
rpc | Whether the message is for RPC. TYPE: |
rpc_timeout | The timeout for RPC. |
raise_timeout | Whether to raise an exception on timeout. TYPE: |
**kwargs | Additional keyword arguments. TYPE: |
RETURNS | DESCRIPTION |
---|---|
Optional[SendableMessage] | The published message. |
RAISES | DESCRIPTION |
---|---|
NotImplementedError | If the method is not implemented. |
Source code in faststream/broker/core/asynchronous.py
publisher abstractmethod
#
publisher(key: Any, publisher: BasePublisher[MsgType]) -> BasePublisher[MsgType]
Publishes a publisher.
PARAMETER | DESCRIPTION |
---|---|
key | The key associated with the publisher. TYPE: |
publisher | The publisher to be published. TYPE: |
RETURNS | DESCRIPTION |
---|---|
BasePublisher[MsgType] | The published publisher. |
RAISES | DESCRIPTION |
---|---|
NotImplementedError | If the method is not implemented. |
Source code in faststream/broker/core/abc.py
start abstractmethod
async
#
Start the broker async use case.
subscriber abstractmethod
#
subscriber(*broker_args: Any, retry: Union[bool, int] = False, dependencies: Sequence[Depends] = (), decoder: Optional[CustomDecoder[StreamMessage[MsgType]]] = None, parser: Optional[CustomParser[MsgType, StreamMessage[MsgType]]] = None, middlewares: Optional[Sequence[Callable[[MsgType], BaseMiddleware]]] = None, filter: Filter[StreamMessage[MsgType]] = default_filter, _raw: bool = False, _get_dependant: Optional[Any] = None, **broker_kwargs: Any) -> Callable[[Union[Callable[P_HandlerParams, T_HandlerReturn], HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]]], HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]]
A function decorator for subscribing to a message broker.
PARAMETER | DESCRIPTION |
---|---|
*broker_args | Positional arguments to be passed to the message broker. TYPE: |
retry | Whether to retry the subscription if it fails. Can be a boolean or an integer specifying the number of retries. |
dependencies | Sequence of dependencies to be injected into the decorated function. TYPE: |
decoder | Custom decoder function for decoding the message. TYPE: |
parser | Custom parser function for parsing the decoded message. TYPE: |
middlewares | Sequence of middleware functions to be applied to the message. TYPE: |
filter | Filter function for filtering the messages to be processed. TYPE: |
_raw | Whether to return the raw message instead of the processed result. TYPE: |
_get_dependant | Optional argument to get the dependant object. |
**broker_kwargs | Keyword arguments to be passed to the message broker. TYPE: |
RETURNS | DESCRIPTION |
---|---|
Callable[[Union[Callable[P_HandlerParams, T_HandlerReturn], HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]]], HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]] | A callable decorator that wraps the decorated function and handles the subscription. |
RAISES | DESCRIPTION |
---|---|
NotImplementedError | If silent animals are not supported. |