BrokerAsyncUsecase
faststream.broker.core.asynchronous.BrokerAsyncUsecase #
BrokerAsyncUsecase(
*args: Any,
apply_types: bool = True,
validate: bool = True,
logger: Optional[Logger] = access_logger,
log_level: int = logging.INFO,
log_fmt: Optional[
str
] = "%(asctime)s %(levelname)s - %(message)s",
dependencies: Sequence[Depends] = (),
decoder: Optional[
CustomDecoder[StreamMessage[MsgType]]
] = None,
parser: Optional[
CustomParser[MsgType, StreamMessage[MsgType]]
] = None,
middlewares: Optional[
Sequence[Callable[[MsgType], BaseMiddleware]]
] = None,
graceful_timeout: Optional[float] = None,
**kwargs: Any
)
Bases: BrokerUsecase[MsgType, ConnectionType]
A class representing a broker async use case.
METHOD | DESCRIPTION |
---|---|
start | Abstract method to start the broker async use case. |
_connect | Any) : Abstract method to connect to the broker. |
_close | Optional[Type[BaseException]] = None, exc_val: Optional[BaseException] = None, exec_tb: Optional[TracebackType] = None) : Abstract method to close the connection to the broker. |
close | Optional[Type[BaseException]] = None, exc_val: Optional[BaseException] = None, exec_tb: Optional[TracebackType] = None) : Close the connection to the broker. |
_process_message | Callable[[StreamMessage[MsgType]], Awaitable[T_HandlerReturn]], watcher: BaseWatcher) : Abstract method to process a message. |
publish | SendableMessage, *args: Any, reply_to: str = "", rpc: bool = False, rpc_timeout: Optional[float] |
Initialize the class.
PARAMETER | DESCRIPTION |
---|---|
*args | Variable length arguments TYPE: |
apply_types | Whether to apply types or not TYPE: |
validate | Whether to cast types using Pydantic validation. TYPE: |
logger | Logger object for logging |
log_level | Log level for logging |
log_fmt | Log format for logging TYPE: |
dependencies | Sequence of dependencies TYPE: |
decoder | Custom decoder object TYPE: |
parser | Custom parser object TYPE: |
middlewares | Sequence of middlewares TYPE: |
graceful_timeout | Graceful timeout |
**kwargs | Keyword arguments TYPE: |
Source code in faststream/broker/core/asynchronous.py
close async
#
close(
exc_type: Optional[Type[BaseException]] = None,
exc_val: Optional[BaseException] = None,
exec_tb: Optional[TracebackType] = None,
) -> None
Closes the object.
PARAMETER | DESCRIPTION |
---|---|
exc_type | The type of the exception being handled, if any. TYPE: |
exc_val | The exception instance being handled, if any. TYPE: |
exec_tb | The traceback of the exception being handled, if any. TYPE: |
RETURNS | DESCRIPTION |
---|---|
None | None |
RAISES | DESCRIPTION |
---|---|
NotImplementedError | If the method is not implemented. |
Source code in faststream/broker/core/asynchronous.py
connect async
#
Connect to a remote server.
PARAMETER | DESCRIPTION |
---|---|
*args | Variable length argument list. TYPE: |
**kwargs | Arbitrary keyword arguments. TYPE: |
RETURNS | DESCRIPTION |
---|---|
ConnectionType | The connection object. |
Source code in faststream/broker/core/asynchronous.py
include_router #
include_router(router: BrokerRouter[Any, MsgType]) -> None
Includes a router in the current object.
PARAMETER | DESCRIPTION |
---|---|
router | The router to be included. TYPE: |
RETURNS | DESCRIPTION |
---|---|
None | None |
Source code in faststream/broker/core/abc.py
include_routers #
include_routers(
*routers: BrokerRouter[Any, MsgType]
) -> None
Includes routers in the current object.
PARAMETER | DESCRIPTION |
---|---|
*routers | Variable length argument list of routers to include. TYPE: |
RETURNS | DESCRIPTION |
---|---|
None | None |
Source code in faststream/broker/core/abc.py
publish abstractmethod
async
#
publish(
message: SendableMessage,
*args: Any,
reply_to: str = "",
rpc: bool = False,
rpc_timeout: Optional[float] = None,
raise_timeout: bool = False,
**kwargs: Any
) -> Optional[SendableMessage]
Publish a message.
PARAMETER | DESCRIPTION |
---|---|
message | The message to be published. TYPE: |
*args | Additional arguments. TYPE: |
reply_to | The reply-to address for the message. TYPE: |
rpc | Whether the message is for RPC. TYPE: |
rpc_timeout | The timeout for RPC. |
raise_timeout | Whether to raise an exception on timeout. TYPE: |
**kwargs | Additional keyword arguments. TYPE: |
RETURNS | DESCRIPTION |
---|---|
Optional[SendableMessage] | The published message. |
RAISES | DESCRIPTION |
---|---|
NotImplementedError | If the method is not implemented. |
Source code in faststream/broker/core/asynchronous.py
publisher abstractmethod
#
publisher(
key: Any, publisher: BasePublisher[MsgType]
) -> BasePublisher[MsgType]
Publishes a publisher.
PARAMETER | DESCRIPTION |
---|---|
key | The key associated with the publisher. TYPE: |
publisher | The publisher to be published. TYPE: |
RETURNS | DESCRIPTION |
---|---|
BasePublisher[MsgType] | The published publisher. |
RAISES | DESCRIPTION |
---|---|
NotImplementedError | If the method is not implemented. |
Source code in faststream/broker/core/abc.py
start abstractmethod
async
#
Start the broker async use case.
subscriber abstractmethod
#
subscriber(
*broker_args: Any,
retry: Union[bool, int] = False,
dependencies: Sequence[Depends] = (),
decoder: Optional[
CustomDecoder[StreamMessage[MsgType]]
] = None,
parser: Optional[
CustomParser[MsgType, StreamMessage[MsgType]]
] = None,
middlewares: Optional[
Sequence[Callable[[MsgType], BaseMiddleware]]
] = None,
filter: Filter[StreamMessage[MsgType]] = default_filter,
_raw: bool = False,
_get_dependant: Optional[Any] = None,
**broker_kwargs: Any
) -> Callable[
[
Union[
Callable[P_HandlerParams, T_HandlerReturn],
HandlerCallWrapper[
MsgType, P_HandlerParams, T_HandlerReturn
],
]
],
HandlerCallWrapper[
MsgType, P_HandlerParams, T_HandlerReturn
],
]
A function decorator for subscribing to a message broker.
PARAMETER | DESCRIPTION |
---|---|
*broker_args | Positional arguments to be passed to the message broker. TYPE: |
retry | Whether to retry the subscription if it fails. Can be a boolean or an integer specifying the number of retries. |
dependencies | Sequence of dependencies to be injected into the decorated function. TYPE: |
decoder | Custom decoder function for decoding the message. TYPE: |
parser | Custom parser function for parsing the decoded message. TYPE: |
middlewares | Sequence of middleware functions to be applied to the message. TYPE: |
filter | Filter function for filtering the messages to be processed. TYPE: |
_raw | Whether to return the raw message instead of the processed result. TYPE: |
_get_dependant | Optional argument to get the dependant object. |
**broker_kwargs | Keyword arguments to be passed to the message broker. TYPE: |
RETURNS | DESCRIPTION |
---|---|
Callable[[Union[Callable[P_HandlerParams, T_HandlerReturn], HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]]], HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]] | A callable decorator that wraps the decorated function and handles the subscription. |
RAISES | DESCRIPTION |
---|---|
NotImplementedError | If silent animals are not supported. |