Skip to content

BrokerUsecase

faststream.broker.core.abc.BrokerUsecase #

BrokerUsecase(
    url: Union[str, List[str]],
    *args: Any,
    protocol: Optional[str] = None,
    protocol_version: Optional[str] = None,
    description: Optional[str] = None,
    tags: Optional[Sequence[Union[Tag, TagDict]]] = None,
    asyncapi_url: Union[str, List[str], None] = None,
    apply_types: bool = True,
    validate: bool = True,
    logger: Optional[Logger] = access_logger,
    log_level: int = logging.INFO,
    log_fmt: Optional[
        str
    ] = "%(asctime)s %(levelname)s - %(message)s",
    dependencies: Sequence[Depends] = (),
    middlewares: Optional[
        Sequence[Callable[[MsgType], BaseMiddleware]]
    ] = None,
    decoder: Optional[
        CustomDecoder[StreamMessage[MsgType]]
    ] = None,
    parser: Optional[
        CustomParser[MsgType, StreamMessage[MsgType]]
    ] = None,
    security: Optional[BaseSecurity] = None,
    **kwargs: Any
)

Bases: ABC, Generic[MsgType, ConnectionType], LoggingMixin

A class representing a broker use case.

METHOD DESCRIPTION
__init__

constructor method

include_router

include a router in the broker

include_routers

include multiple routers in the broker

_resolve_connection_kwargs

resolve connection kwargs

_wrap_handler

wrap a handler function

_abc_start

start the broker

_abc_close

close the broker

_abc__close

close the broker connection

_process_message

process a message

subscriber

decorator to register a subscriber

publisher

register a publisher

_wrap_decode_message

wrap a message decoding function

Initialize a broker.

PARAMETER DESCRIPTION
url

The URL or list of URLs to connect to.

TYPE: Union[str, List[str]]

*args

Additional arguments.

TYPE: Any DEFAULT: ()

protocol

The protocol to use for the connection.

TYPE: Optional[str] DEFAULT: None

protocol_version

The version of the protocol.

TYPE: Optional[str] DEFAULT: None

description

A description of the broker.

TYPE: Optional[str] DEFAULT: None

tags

Tags associated with the broker.

TYPE: Optional[Sequence[Union[Tag, TagDict]]] DEFAULT: None

asyncapi_url

The URL or list of URLs to the AsyncAPI schema.

TYPE: Union[str, List[str], None] DEFAULT: None

apply_types

Whether to apply types to messages.

TYPE: bool DEFAULT: True

validate

Whether to cast types using Pydantic validation.

TYPE: bool DEFAULT: True

logger

The logger to use.

TYPE: Optional[Logger] DEFAULT: access_logger

log_level

The log level to use.

TYPE: int DEFAULT: INFO

log_fmt

The log format to use.

TYPE: Optional[str] DEFAULT: '%(asctime)s %(levelname)s - %(message)s'

dependencies

Dependencies of the broker.

TYPE: Sequence[Depends] DEFAULT: ()

middlewares

Middlewares to use.

TYPE: Optional[Sequence[Callable[[MsgType], BaseMiddleware]]] DEFAULT: None

decoder

Custom decoder for messages.

TYPE: Optional[CustomDecoder[StreamMessage[MsgType]]] DEFAULT: None

parser

Custom parser for messages.

TYPE: Optional[CustomParser[MsgType, StreamMessage[MsgType]]] DEFAULT: None

security

Security scheme to use.

TYPE: Optional[BaseSecurity] DEFAULT: None

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

Source code in faststream/broker/core/abc.py
def __init__(
    self,
    url: Union[str, List[str]],
    *args: Any,
    # AsyncAPI kwargs
    protocol: Optional[str] = None,
    protocol_version: Optional[str] = None,
    description: Optional[str] = None,
    tags: Optional[Sequence[Union[asyncapi.Tag, asyncapi.TagDict]]] = None,
    asyncapi_url: Union[str, List[str], None] = None,
    # broker kwargs
    apply_types: bool = True,
    validate: bool = True,
    logger: Optional[logging.Logger] = access_logger,
    log_level: int = logging.INFO,
    log_fmt: Optional[str] = "%(asctime)s %(levelname)s - %(message)s",
    dependencies: Sequence[Depends] = (),
    middlewares: Optional[Sequence[Callable[[MsgType], BaseMiddleware]]] = None,
    decoder: Optional[CustomDecoder[StreamMessage[MsgType]]] = None,
    parser: Optional[CustomParser[MsgType, StreamMessage[MsgType]]] = None,
    security: Optional[BaseSecurity] = None,
    **kwargs: Any,
) -> None:
    """Initialize a broker.

    Args:
        url: The URL or list of URLs to connect to.
        *args: Additional arguments.
        protocol: The protocol to use for the connection.
        protocol_version: The version of the protocol.
        description: A description of the broker.
        tags: Tags associated with the broker.
        asyncapi_url: The URL or list of URLs to the AsyncAPI schema.
        apply_types: Whether to apply types to messages.
        validate: Whether to cast types using Pydantic validation.
        logger: The logger to use.
        log_level: The log level to use.
        log_fmt: The log format to use.
        dependencies: Dependencies of the broker.
        middlewares: Middlewares to use.
        decoder: Custom decoder for messages.
        parser: Custom parser for messages.
        security: Security scheme to use.
        **kwargs: Additional keyword arguments.

    """
    super().__init__(
        logger=logger,
        log_level=log_level,
        log_fmt=log_fmt,
    )

    self._connection = None
    self._is_apply_types = apply_types
    self._is_validate = validate
    self.handlers = {}
    self._publishers = {}
    empty_middleware: Sequence[Callable[[MsgType], BaseMiddleware]] = ()
    midd_args: Sequence[Callable[[MsgType], BaseMiddleware]] = (
        middlewares or empty_middleware
    )
    self.middlewares = [CriticalLogMiddleware(logger, log_level), *midd_args]
    self.dependencies = dependencies

    self._connection_args = (url, *args)
    self._connection_kwargs = kwargs

    self._global_parser = parser
    self._global_decoder = decoder

    context.set_global("logger", logger)
    context.set_global("broker", self)

    self.started = False

    # AsyncAPI information
    self.url = asyncapi_url or url
    self.protocol = protocol
    self.protocol_version = protocol_version
    self.description = description
    self.tags = tags
    self.security = security

dependencies instance-attribute #

dependencies: Sequence[Depends] = dependencies

description instance-attribute #

description = description

fmt property #

fmt: str

Getter method for _fmt attribute.

handlers instance-attribute #

handlers: Mapping[Any, BaseHandler[MsgType]] = {}

log_level instance-attribute #

log_level: int

logger instance-attribute #

logger: Optional[Logger]

middlewares instance-attribute #

middlewares: Sequence[Callable[[Any], BaseMiddleware]] = [
    CriticalLogMiddleware(logger, log_level),
    *midd_args,
]

protocol instance-attribute #

protocol = protocol

protocol_version instance-attribute #

protocol_version = protocol_version

security instance-attribute #

security = security

started instance-attribute #

started: bool = False

tags instance-attribute #

tags = tags

url instance-attribute #

url = asyncapi_url or url

include_router #

include_router(router: BrokerRouter[Any, MsgType]) -> None

Includes a router in the current object.

PARAMETER DESCRIPTION
router

The router to be included.

TYPE: BrokerRouter[Any, MsgType]

RETURNS DESCRIPTION
None

None

Source code in faststream/broker/core/abc.py
def include_router(self, router: BrokerRouter[Any, MsgType]) -> None:
    """Includes a router in the current object.

    Args:
        router: The router to be included.

    Returns:
        None

    """
    for r in router._handlers:
        self.subscriber(*r.args, **r.kwargs)(r.call)

    self._publishers = {**self._publishers, **router._publishers}

include_routers #

include_routers(
    *routers: BrokerRouter[Any, MsgType]
) -> None

Includes routers in the current object.

PARAMETER DESCRIPTION
*routers

Variable length argument list of routers to include.

TYPE: BrokerRouter[Any, MsgType] DEFAULT: ()

RETURNS DESCRIPTION
None

None

Source code in faststream/broker/core/abc.py
def include_routers(self, *routers: BrokerRouter[Any, MsgType]) -> None:
    """Includes routers in the current object.

    Args:
        *routers: Variable length argument list of routers to include.

    Returns:
        None

    """
    for r in routers:
        self.include_router(r)

publisher abstractmethod #

publisher(
    key: Any, publisher: BasePublisher[MsgType]
) -> BasePublisher[MsgType]

Publishes a publisher.

PARAMETER DESCRIPTION
key

The key associated with the publisher.

TYPE: Any

publisher

The publisher to be published.

TYPE: BasePublisher[MsgType]

RETURNS DESCRIPTION
BasePublisher[MsgType]

The published publisher.

RAISES DESCRIPTION
NotImplementedError

If the method is not implemented.

Source code in faststream/broker/core/abc.py
@abstractmethod
def publisher(
    self,
    key: Any,
    publisher: BasePublisher[MsgType],
) -> BasePublisher[MsgType]:
    """Publishes a publisher.

    Args:
        key: The key associated with the publisher.
        publisher: The publisher to be published.

    Returns:
        The published publisher.

    Raises:
        NotImplementedError: If the method is not implemented.

    """
    self._publishers = {**self._publishers, key: publisher}
    return publisher

subscriber abstractmethod #

subscriber(
    *broker_args: Any,
    retry: Union[bool, int] = False,
    dependencies: Sequence[Depends] = (),
    decoder: Optional[
        CustomDecoder[StreamMessage[MsgType]]
    ] = None,
    parser: Optional[
        CustomParser[MsgType, StreamMessage[MsgType]]
    ] = None,
    middlewares: Optional[
        Sequence[
            Callable[
                [StreamMessage[MsgType]], BaseMiddleware
            ]
        ]
    ] = None,
    filter: Filter[
        StreamMessage[MsgType]
    ] = lambda: not m.processed,
    _raw: bool = False,
    _get_dependant: Optional[Any] = None,
    **broker_kwargs: Any
) -> Callable[
    [
        Union[
            Callable[P_HandlerParams, T_HandlerReturn],
            HandlerCallWrapper[
                MsgType, P_HandlerParams, T_HandlerReturn
            ],
        ]
    ],
    HandlerCallWrapper[
        MsgType, P_HandlerParams, T_HandlerReturn
    ],
]

This is a function decorator for subscribing to a message broker.

PARAMETER DESCRIPTION
*broker_args

Positional arguments to be passed to the broker.

TYPE: Any DEFAULT: ()

retry

Whether to retry the subscription if it fails. Can be a boolean or an integer specifying the number of retries.

TYPE: Union[bool, int] DEFAULT: False

dependencies

Sequence of dependencies to be injected into the handler function.

TYPE: Sequence[Depends] DEFAULT: ()

decoder

Custom decoder function to decode the message.

TYPE: Optional[CustomDecoder[StreamMessage[MsgType]]] DEFAULT: None

parser

Custom parser function to parse the decoded message.

TYPE: Optional[CustomParser[MsgType, StreamMessage[MsgType]]] DEFAULT: None

middlewares

Sequence of middleware functions to be applied to the message.

TYPE: Optional[Sequence[Callable[[StreamMessage[MsgType]], BaseMiddleware]]] DEFAULT: None

filter

Filter function to filter the messages to be processed.

TYPE: Filter[StreamMessage[MsgType]] DEFAULT: lambda : not processed

_raw

Whether to return the raw message instead of the processed message.

TYPE: bool DEFAULT: False

_get_dependant

Optional parameter to get the dependant object.

TYPE: Optional[Any] DEFAULT: None

**broker_kwargs

Keyword arguments to be passed to the broker.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Callable[[Union[Callable[P_HandlerParams, T_HandlerReturn], HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]]], HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]]

A callable object that can be used as a decorator for a handler function.

RAISES DESCRIPTION
RuntimeWarning

If the broker is already running.

Source code in faststream/broker/core/abc.py
@abstractmethod
def subscriber(  # type: ignore[return]
    self,
    *broker_args: Any,
    retry: Union[bool, int] = False,
    dependencies: Sequence[Depends] = (),
    decoder: Optional[CustomDecoder[StreamMessage[MsgType]]] = None,
    parser: Optional[CustomParser[MsgType, StreamMessage[MsgType]]] = None,
    middlewares: Optional[
        Sequence[
            Callable[
                [StreamMessage[MsgType]],
                BaseMiddleware,
            ]
        ]
    ] = None,
    filter: Filter[StreamMessage[MsgType]] = lambda m: not m.processed,
    _raw: bool = False,
    _get_dependant: Optional[Any] = None,
    **broker_kwargs: Any,
) -> Callable[
    [
        Union[
            Callable[P_HandlerParams, T_HandlerReturn],
            HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn],
        ]
    ],
    HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn],
]:
    """This is a function decorator for subscribing to a message broker.

    Args:
        *broker_args: Positional arguments to be passed to the broker.
        retry: Whether to retry the subscription if it fails. Can be a boolean or an integer specifying the number of retries.
        dependencies: Sequence of dependencies to be injected into the handler function.
        decoder: Custom decoder function to decode the message.
        parser: Custom parser function to parse the decoded message.
        middlewares: Sequence of middleware functions to be applied to the message.
        filter: Filter function to filter the messages to be processed.
        _raw: Whether to return the raw message instead of the processed message.
        _get_dependant: Optional parameter to get the dependant object.
        **broker_kwargs: Keyword arguments to be passed to the broker.

    Returns:
        A callable object that can be used as a decorator for a handler function.

    Raises:
        RuntimeWarning: If the broker is already running.

    """
    if self.started and not is_test_env():  # pragma: no cover
        warnings.warn(
            "You are trying to register `handler` with already running broker\n"
            "It has no effect until broker restarting.",
            category=RuntimeWarning,
            stacklevel=1,
        )