Skip to content

BrokerAsyncUsecase

faststream.broker.core.asynchronous.BrokerAsyncUsecase #

BrokerAsyncUsecase(*args: Any, apply_types: bool = True, validate: bool = True, logger: Optional[Logger] = access_logger, log_level: int = logging.INFO, log_fmt: Optional[str] = '%(asctime)s %(levelname)s - %(message)s', dependencies: Sequence[Depends] = (), decoder: Optional[CustomDecoder[StreamMessage[MsgType]]] = None, parser: Optional[CustomParser[MsgType, StreamMessage[MsgType]]] = None, middlewares: Optional[Sequence[Callable[[MsgType], BaseMiddleware]]] = None, graceful_timeout: Optional[float] = None, **kwargs: Any)

Bases: BrokerUsecase[MsgType, ConnectionType]

A class representing a broker async use case.

METHOD DESCRIPTION
start

Abstract method to start the broker async use case.

_connect

Any) : Abstract method to connect to the broker.

_close

Optional[Type[BaseException]] = None, exc_val: Optional[BaseException] = None, exec_tb: Optional[TracebackType] = None) : Abstract method to close the connection to the broker.

close

Optional[Type[BaseException]] = None, exc_val: Optional[BaseException] = None, exec_tb: Optional[TracebackType] = None) : Close the connection to the broker.

_process_message

Callable[[StreamMessage[MsgType]], Awaitable[T_HandlerReturn]], watcher: BaseWatcher) : Abstract method to process a message.

publish

SendableMessage, *args: Any, reply_to: str = "", rpc: bool = False, rpc_timeout: Optional[float]

Initialize the class.

PARAMETER DESCRIPTION
*args

Variable length arguments

TYPE: Any DEFAULT: ()

apply_types

Whether to apply types or not

TYPE: bool DEFAULT: True

validate

Whether to cast types using Pydantic validation.

TYPE: bool DEFAULT: True

logger

Logger object for logging

TYPE: Optional[Logger] DEFAULT: access_logger

log_level

Log level for logging

TYPE: int DEFAULT: INFO

log_fmt

Log format for logging

TYPE: Optional[str] DEFAULT: '%(asctime)s %(levelname)s - %(message)s'

dependencies

Sequence of dependencies

TYPE: Sequence[Depends] DEFAULT: ()

decoder

Custom decoder object

TYPE: Optional[CustomDecoder[StreamMessage[MsgType]]] DEFAULT: None

parser

Custom parser object

TYPE: Optional[CustomParser[MsgType, StreamMessage[MsgType]]] DEFAULT: None

middlewares

Sequence of middlewares

TYPE: Optional[Sequence[Callable[[MsgType], BaseMiddleware]]] DEFAULT: None

graceful_timeout

Graceful timeout

TYPE: Optional[float] DEFAULT: None

**kwargs

Keyword arguments

TYPE: Any DEFAULT: {}

Source code in faststream/broker/core/asynchronous.py
def __init__(
    self,
    *args: Any,
    apply_types: bool = True,
    validate: bool = True,
    logger: Optional[logging.Logger] = access_logger,
    log_level: int = logging.INFO,
    log_fmt: Optional[str] = "%(asctime)s %(levelname)s - %(message)s",
    dependencies: Sequence[Depends] = (),
    decoder: Optional[CustomDecoder[StreamMessage[MsgType]]] = None,
    parser: Optional[CustomParser[MsgType, StreamMessage[MsgType]]] = None,
    middlewares: Optional[Sequence[Callable[[MsgType], BaseMiddleware]]] = None,
    graceful_timeout: Optional[float] = None,
    **kwargs: Any,
) -> None:
    """Initialize the class.

    Args:
        *args: Variable length arguments
        apply_types: Whether to apply types or not
        validate: Whether to cast types using Pydantic validation.
        logger: Logger object for logging
        log_level: Log level for logging
        log_fmt: Log format for logging
        dependencies: Sequence of dependencies
        decoder: Custom decoder object
        parser: Custom parser object
        middlewares: Sequence of middlewares
        graceful_timeout: Graceful timeout
        **kwargs: Keyword arguments

    """
    super().__init__(
        *args,
        apply_types=apply_types,
        validate=validate,
        logger=logger,
        log_level=log_level,
        log_fmt=log_fmt,
        dependencies=dependencies,
        decoder=cast(
            Optional[AsyncCustomDecoder[StreamMessage[MsgType]]],
            to_async(decoder) if decoder else None,
        ),
        parser=cast(
            Optional[AsyncCustomParser[MsgType, StreamMessage[MsgType]]],
            to_async(parser) if parser else None,
        ),
        middlewares=middlewares,
        **kwargs,
    )
    self.graceful_timeout = graceful_timeout

dependencies instance-attribute #

dependencies: Sequence[Depends] = dependencies

description instance-attribute #

description = description

fmt property #

fmt: str

Getter method for _fmt attribute.

graceful_timeout instance-attribute #

graceful_timeout = graceful_timeout

handlers instance-attribute #

handlers: Mapping[Any, AsyncHandler[MsgType]]

log_level instance-attribute #

log_level: int

logger instance-attribute #

logger: Optional[Logger]

middlewares instance-attribute #

middlewares: Sequence[Callable[[MsgType], BaseMiddleware]]

protocol instance-attribute #

protocol = protocol

protocol_version instance-attribute #

protocol_version = protocol_version

security instance-attribute #

security = security

started instance-attribute #

started: bool = False

tags instance-attribute #

tags = tags

url instance-attribute #

url = asyncapi_url or url

close async #

close(exc_type: Optional[Type[BaseException]] = None, exc_val: Optional[BaseException] = None, exec_tb: Optional[TracebackType] = None) -> None

Closes the object.

PARAMETER DESCRIPTION
exc_type

The type of the exception being handled, if any.

TYPE: Optional[Type[BaseException]] DEFAULT: None

exc_val

The exception instance being handled, if any.

TYPE: Optional[BaseException] DEFAULT: None

exec_tb

The traceback of the exception being handled, if any.

TYPE: Optional[TracebackType] DEFAULT: None

RETURNS DESCRIPTION
None

None

RAISES DESCRIPTION
NotImplementedError

If the method is not implemented.

Source code in faststream/broker/core/asynchronous.py
async def close(
    self,
    exc_type: Optional[Type[BaseException]] = None,
    exc_val: Optional[BaseException] = None,
    exec_tb: Optional[TracebackType] = None,
) -> None:
    """Closes the object.

    Args:
        exc_type: The type of the exception being handled, if any.
        exc_val: The exception instance being handled, if any.
        exec_tb: The traceback of the exception being handled, if any.

    Returns:
        None

    Raises:
        NotImplementedError: If the method is not implemented.

    """
    super()._abc_close(exc_type, exc_val, exec_tb)

    for h in self.handlers.values():
        await h.close()

    if self._connection is not None:
        await self._close(exc_type, exc_val, exec_tb)

connect async #

connect(*args: Any, **kwargs: Any) -> ConnectionType

Connect to a remote server.

PARAMETER DESCRIPTION
*args

Variable length argument list.

TYPE: Any DEFAULT: ()

**kwargs

Arbitrary keyword arguments.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
ConnectionType

The connection object.

Source code in faststream/broker/core/asynchronous.py
async def connect(self, *args: Any, **kwargs: Any) -> ConnectionType:
    """Connect to a remote server.

    Args:
        *args: Variable length argument list.
        **kwargs: Arbitrary keyword arguments.

    Returns:
        The connection object.

    """
    if self._connection is None:
        _kwargs = self._resolve_connection_kwargs(*args, **kwargs)
        self._connection = await self._connect(**_kwargs)
    return self._connection

include_router #

include_router(router: BrokerRouter[Any, MsgType]) -> None

Includes a router in the current object.

PARAMETER DESCRIPTION
router

The router to be included.

TYPE: BrokerRouter[Any, MsgType]

RETURNS DESCRIPTION
None

None

Source code in faststream/broker/core/abc.py
def include_router(self, router: BrokerRouter[Any, MsgType]) -> None:
    """Includes a router in the current object.

    Args:
        router: The router to be included.

    Returns:
        None

    """
    for r in router._handlers:
        self.subscriber(*r.args, **r.kwargs)(r.call)

    self._publishers = {**self._publishers, **router._publishers}

include_routers #

include_routers(*routers: BrokerRouter[Any, MsgType]) -> None

Includes routers in the current object.

PARAMETER DESCRIPTION
*routers

Variable length argument list of routers to include.

TYPE: BrokerRouter[Any, MsgType] DEFAULT: ()

RETURNS DESCRIPTION
None

None

Source code in faststream/broker/core/abc.py
def include_routers(self, *routers: BrokerRouter[Any, MsgType]) -> None:
    """Includes routers in the current object.

    Args:
        *routers: Variable length argument list of routers to include.

    Returns:
        None

    """
    for r in routers:
        self.include_router(r)

publish abstractmethod async #

publish(message: SendableMessage, *args: Any, reply_to: str = '', rpc: bool = False, rpc_timeout: Optional[float] = None, raise_timeout: bool = False, **kwargs: Any) -> Optional[SendableMessage]

Publish a message.

PARAMETER DESCRIPTION
message

The message to be published.

TYPE: SendableMessage

*args

Additional arguments.

TYPE: Any DEFAULT: ()

reply_to

The reply-to address for the message.

TYPE: str DEFAULT: ''

rpc

Whether the message is for RPC.

TYPE: bool DEFAULT: False

rpc_timeout

The timeout for RPC.

TYPE: Optional[float] DEFAULT: None

raise_timeout

Whether to raise an exception on timeout.

TYPE: bool DEFAULT: False

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Optional[SendableMessage]

The published message.

RAISES DESCRIPTION
NotImplementedError

If the method is not implemented.

Source code in faststream/broker/core/asynchronous.py
@abstractmethod
async def publish(
    self,
    message: SendableMessage,
    *args: Any,
    reply_to: str = "",
    rpc: bool = False,
    rpc_timeout: Optional[float] = None,
    raise_timeout: bool = False,
    **kwargs: Any,
) -> Optional[SendableMessage]:
    """Publish a message.

    Args:
        message: The message to be published.
        *args: Additional arguments.
        reply_to: The reply-to address for the message.
        rpc: Whether the message is for RPC.
        rpc_timeout: The timeout for RPC.
        raise_timeout: Whether to raise an exception on timeout.
        **kwargs: Additional keyword arguments.

    Returns:
        The published message.

    Raises:
        NotImplementedError: If the method is not implemented.

    """
    raise NotImplementedError()

publisher abstractmethod #

publisher(key: Any, publisher: BasePublisher[MsgType]) -> BasePublisher[MsgType]

Publishes a publisher.

PARAMETER DESCRIPTION
key

The key associated with the publisher.

TYPE: Any

publisher

The publisher to be published.

TYPE: BasePublisher[MsgType]

RETURNS DESCRIPTION
BasePublisher[MsgType]

The published publisher.

RAISES DESCRIPTION
NotImplementedError

If the method is not implemented.

Source code in faststream/broker/core/abc.py
@abstractmethod
def publisher(
    self,
    key: Any,
    publisher: BasePublisher[MsgType],
) -> BasePublisher[MsgType]:
    """Publishes a publisher.

    Args:
        key: The key associated with the publisher.
        publisher: The publisher to be published.

    Returns:
        The published publisher.

    Raises:
        NotImplementedError: If the method is not implemented.

    """
    self._publishers = {**self._publishers, key: publisher}
    return publisher

start abstractmethod async #

start() -> None

Start the broker async use case.

Source code in faststream/broker/core/asynchronous.py
@abstractmethod
async def start(self) -> None:
    """Start the broker async use case."""
    super()._abc_start()
    for h in self.handlers.values():
        for f, _, _, _, _, _ in h.calls:
            f.refresh(with_mock=False)
    await self.connect()

subscriber abstractmethod #

subscriber(*broker_args: Any, retry: Union[bool, int] = False, dependencies: Sequence[Depends] = (), decoder: Optional[CustomDecoder[StreamMessage[MsgType]]] = None, parser: Optional[CustomParser[MsgType, StreamMessage[MsgType]]] = None, middlewares: Optional[Sequence[Callable[[MsgType], BaseMiddleware]]] = None, filter: Filter[StreamMessage[MsgType]] = default_filter, _raw: bool = False, _get_dependant: Optional[Any] = None, **broker_kwargs: Any) -> Callable[[Union[Callable[P_HandlerParams, T_HandlerReturn], HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]]], HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]]

A function decorator for subscribing to a message broker.

PARAMETER DESCRIPTION
*broker_args

Positional arguments to be passed to the message broker.

TYPE: Any DEFAULT: ()

retry

Whether to retry the subscription if it fails. Can be a boolean or an integer specifying the number of retries.

TYPE: Union[bool, int] DEFAULT: False

dependencies

Sequence of dependencies to be injected into the decorated function.

TYPE: Sequence[Depends] DEFAULT: ()

decoder

Custom decoder function for decoding the message.

TYPE: Optional[CustomDecoder[StreamMessage[MsgType]]] DEFAULT: None

parser

Custom parser function for parsing the decoded message.

TYPE: Optional[CustomParser[MsgType, StreamMessage[MsgType]]] DEFAULT: None

middlewares

Sequence of middleware functions to be applied to the message.

TYPE: Optional[Sequence[Callable[[MsgType], BaseMiddleware]]] DEFAULT: None

filter

Filter function for filtering the messages to be processed.

TYPE: Filter[StreamMessage[MsgType]] DEFAULT: default_filter

_raw

Whether to return the raw message instead of the processed result.

TYPE: bool DEFAULT: False

_get_dependant

Optional argument to get the dependant object.

TYPE: Optional[Any] DEFAULT: None

**broker_kwargs

Keyword arguments to be passed to the message broker.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Callable[[Union[Callable[P_HandlerParams, T_HandlerReturn], HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]]], HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]]

A callable decorator that wraps the decorated function and handles the subscription.

RAISES DESCRIPTION
NotImplementedError

If silent animals are not supported.

Source code in faststream/broker/core/asynchronous.py
@override
@abstractmethod
def subscriber(  # type: ignore[override,return]
    self,
    *broker_args: Any,
    retry: Union[bool, int] = False,
    dependencies: Sequence[Depends] = (),
    decoder: Optional[CustomDecoder[StreamMessage[MsgType]]] = None,
    parser: Optional[CustomParser[MsgType, StreamMessage[MsgType]]] = None,
    middlewares: Optional[Sequence[Callable[[MsgType], BaseMiddleware]]] = None,
    filter: Filter[StreamMessage[MsgType]] = default_filter,
    _raw: bool = False,
    _get_dependant: Optional[Any] = None,
    **broker_kwargs: Any,
) -> Callable[
    [
        Union[
            Callable[P_HandlerParams, T_HandlerReturn],
            HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn],
        ]
    ],
    HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn],
]:
    """A function decorator for subscribing to a message broker.

    Args:
        *broker_args: Positional arguments to be passed to the message broker.
        retry: Whether to retry the subscription if it fails. Can be a boolean or an integer specifying the number of retries.
        dependencies: Sequence of dependencies to be injected into the decorated function.
        decoder: Custom decoder function for decoding the message.
        parser: Custom parser function for parsing the decoded message.
        middlewares: Sequence of middleware functions to be applied to the message.
        filter: Filter function for filtering the messages to be processed.
        _raw: Whether to return the raw message instead of the processed result.
        _get_dependant: Optional argument to get the dependant object.
        **broker_kwargs: Keyword arguments to be passed to the message broker.

    Returns:
        A callable decorator that wraps the decorated function and handles the subscription.

    Raises:
        NotImplementedError: If silent animals are not supported.

    """
    super().subscriber()