Skip to content

BrokerUsecase

faststream.broker.core.usecase.BrokerUsecase #

BrokerUsecase(*, decoder, parser, dependencies, middlewares, graceful_timeout, default_logger, logger, log_level, log_fmt, apply_types, validate, _get_dependant, _call_decorators, protocol, protocol_version, description, tags, asyncapi_url, security, **connection_kwargs)

Bases: LoggingBroker[MsgType], SetupAble, Generic[MsgType, ConnectionType]

A class representing a broker async use case.

Source code in faststream/broker/core/usecase.py
def __init__(
    self,
    *,
    decoder: Annotated[
        Optional["CustomCallable"],
        Doc("Custom decoder object."),
    ],
    parser: Annotated[
        Optional["CustomCallable"],
        Doc("Custom parser object."),
    ],
    dependencies: Annotated[
        Iterable["Depends"],
        Doc("Dependencies to apply to all broker subscribers."),
    ],
    middlewares: Annotated[
        Sequence["BrokerMiddleware[MsgType]"],
        Doc("Middlewares to apply to all broker publishers/subscribers."),
    ],
    graceful_timeout: Annotated[
        Optional[float],
        Doc(
            "Graceful shutdown timeout. Broker waits for all running subscribers completion before shut down."
        ),
    ],
    # Logging args
    default_logger: Annotated[
        logging.Logger,
        Doc("Logger object to use if `logger` is not set."),
    ],
    logger: Annotated[
        Optional["LoggerProto"],
        Doc("User specified logger to pass into Context and log service messages."),
    ],
    log_level: Annotated[
        int,
        Doc("Service messages log level."),
    ],
    log_fmt: Annotated[
        Optional[str],
        Doc("Default logger log format."),
    ],
    # FastDepends args
    apply_types: Annotated[
        bool,
        Doc("Whether to use FastDepends or not."),
    ],
    validate: Annotated[
        bool,
        Doc("Whether to cast types using Pydantic validation."),
    ],
    _get_dependant: Annotated[
        Optional[Callable[..., Any]],
        Doc("Custom library dependant generator callback."),
    ],
    _call_decorators: Annotated[
        Iterable["Decorator"],
        Doc("Any custom decorator to apply to wrapped functions."),
    ],
    # AsyncAPI kwargs
    protocol: Annotated[
        Optional[str],
        Doc("AsyncAPI server protocol."),
    ],
    protocol_version: Annotated[
        Optional[str],
        Doc("AsyncAPI server protocol version."),
    ],
    description: Annotated[
        Optional[str],
        Doc("AsyncAPI server description."),
    ],
    tags: Annotated[
        Optional[Iterable[Union["Tag", "TagDict"]]],
        Doc("AsyncAPI server tags."),
    ],
    asyncapi_url: Annotated[
        Union[str, List[str]],
        Doc("AsyncAPI hardcoded server addresses."),
    ],
    security: Annotated[
        Optional["BaseSecurity"],
        Doc(
            "Security options to connect broker and generate AsyncAPI server security."
        ),
    ],
    **connection_kwargs: Any,
) -> None:
    super().__init__(
        middlewares=middlewares,
        dependencies=dependencies,
        decoder=cast(
            Optional["AsyncCustomCallable"],
            to_async(decoder) if decoder else None,
        ),
        parser=cast(
            Optional["AsyncCustomCallable"],
            to_async(parser) if parser else None,
        ),
        # Broker is a root router
        include_in_schema=True,
        prefix="",
        # Logging args
        default_logger=default_logger,
        log_level=log_level,
        log_fmt=log_fmt,
        logger=logger,
    )

    self.running = False
    self.graceful_timeout = graceful_timeout

    self._connection_kwargs = connection_kwargs
    self._connection = None
    self._producer = None

    # TODO: remove useless middleware filter
    if not is_test_env():
        self._middlewares = (
            CriticalLogMiddleware(self.logger, log_level),
            *self._middlewares,
        )

    # TODO: move this context to Handlers' extra_context to support multiple brokers
    context.set_global("logger", self.logger)
    context.set_global("broker", self)

    # FastDepends args
    self._is_apply_types = apply_types
    self._is_validate = validate
    self._get_dependant = _get_dependant
    self._call_decorators = _call_decorators

    # AsyncAPI information
    self.url = asyncapi_url
    self.protocol = protocol
    self.protocol_version = protocol_version
    self.description = description
    self.tags = tags
    self.security = security

prefix instance-attribute #

prefix = prefix

include_in_schema instance-attribute #

include_in_schema = include_in_schema

logger instance-attribute #

logger

use_custom instance-attribute #

use_custom = True

running instance-attribute #

running = False

graceful_timeout instance-attribute #

graceful_timeout = graceful_timeout

url instance-attribute #

url = asyncapi_url

protocol instance-attribute #

protocol = protocol

protocol_version instance-attribute #

protocol_version = protocol_version

description instance-attribute #

description = description

tags instance-attribute #

tags = tags

security instance-attribute #

security = security

add_middleware #

add_middleware(middleware)

Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of already existed ones.

Source code in faststream/broker/core/abc.py
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.

    Current middleware will be used as a most inner of already existed ones.
    """
    self._middlewares = (*self._middlewares, middleware)

    for sub in self._subscribers.values():
        sub.add_middleware(middleware)

    for pub in self._publishers.values():
        pub.add_middleware(middleware)

subscriber abstractmethod #

subscriber(subscriber)
Source code in faststream/broker/core/abc.py
@abstractmethod
def subscriber(
    self,
    subscriber: "SubscriberProto[MsgType]",
) -> "SubscriberProto[MsgType]":
    subscriber.add_prefix(self.prefix)
    key = hash(subscriber)
    subscriber = self._subscribers.get(key, subscriber)
    self._subscribers = {**self._subscribers, key: subscriber}
    return subscriber

include_router #

include_router(router, *, prefix='', dependencies=(), middlewares=(), include_in_schema=None)

Includes a router in the current object.

Source code in faststream/broker/core/abc.py
def include_router(
    self,
    router: "ABCBroker[Any]",
    *,
    prefix: str = "",
    dependencies: Iterable["Depends"] = (),
    middlewares: Iterable["BrokerMiddleware[MsgType]"] = (),
    include_in_schema: Optional[bool] = None,
) -> None:
    """Includes a router in the current object."""
    for h in router._subscribers.values():
        h.add_prefix("".join((self.prefix, prefix)))

        if (key := hash(h)) not in self._subscribers:
            if include_in_schema is None:
                h.include_in_schema = self._solve_include_in_schema(
                    h.include_in_schema
                )
            else:
                h.include_in_schema = include_in_schema

            h._broker_middlewares = (
                *self._middlewares,
                *middlewares,
                *h._broker_middlewares,
            )
            h._broker_dependencies = (
                *self._dependencies,
                *dependencies,
                *h._broker_dependencies,
            )
            self._subscribers = {**self._subscribers, key: h}

    for p in router._publishers.values():
        p.add_prefix(self.prefix)

        if (key := hash(p)) not in self._publishers:
            if include_in_schema is None:
                p.include_in_schema = self._solve_include_in_schema(
                    p.include_in_schema
                )
            else:
                p.include_in_schema = include_in_schema

            p._broker_middlewares = (
                *self._middlewares,
                *middlewares,
                *p._broker_middlewares,
            )
            self._publishers = {**self._publishers, key: p}

include_routers #

include_routers(*routers)

Includes routers in the object.

Source code in faststream/broker/core/abc.py
def include_routers(
    self,
    *routers: "ABCBroker[MsgType]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)

get_fmt abstractmethod #

get_fmt()

Fallback method to get log format if log_fmt if not specified.

Source code in faststream/broker/core/logging.py
@abstractmethod
def get_fmt(self) -> str:
    """Fallback method to get log format if `log_fmt` if not specified."""
    raise NotImplementedError()

start abstractmethod async #

start()

Start the broker async use case.

Source code in faststream/broker/core/usecase.py
@abstractmethod
async def start(self) -> None:
    """Start the broker async use case."""
    self._abc_start()
    await self.connect()

connect async #

connect(**kwargs)

Connect to a remote server.

Source code in faststream/broker/core/usecase.py
async def connect(self, **kwargs: Any) -> ConnectionType:
    """Connect to a remote server."""
    if self._connection is None:
        connection_kwargs = self._connection_kwargs.copy()
        connection_kwargs.update(kwargs)
        self._connection = await self._connect(**connection_kwargs)
    self.setup()
    return self._connection

setup #

setup()

Prepare all Broker entities to startup.

Source code in faststream/broker/core/usecase.py
def setup(self) -> None:
    """Prepare all Broker entities to startup."""
    for h in self._subscribers.values():
        self.setup_subscriber(h)

    for p in self._publishers.values():
        self.setup_publisher(p)

setup_subscriber #

setup_subscriber(subscriber, **kwargs)

Setup the Subscriber to prepare it to starting.

Source code in faststream/broker/core/usecase.py
def setup_subscriber(
    self,
    subscriber: SubscriberProto[MsgType],
    **kwargs: Any,
) -> None:
    """Setup the Subscriber to prepare it to starting."""
    data = self._subscriber_setup_extra.copy()
    data.update(kwargs)
    subscriber.setup(**data)

setup_publisher #

setup_publisher(publisher, **kwargs)

Setup the Publisher to prepare it to starting.

Source code in faststream/broker/core/usecase.py
def setup_publisher(
    self,
    publisher: "PublisherProto[MsgType]",
    **kwargs: Any,
) -> None:
    """Setup the Publisher to prepare it to starting."""
    data = self._publisher_setup_extra.copy()
    data.update(kwargs)
    publisher.setup(**data)

publisher #

publisher(*args, **kwargs)
Source code in faststream/broker/core/usecase.py
def publisher(self, *args: Any, **kwargs: Any) -> "PublisherProto[MsgType]":
    pub = super().publisher(*args, **kwargs)
    if self.running:
        self.setup_publisher(pub)
    return pub

close async #

close(exc_type=None, exc_val=None, exc_tb=None)

Closes the object.

Source code in faststream/broker/core/usecase.py
async def close(
    self,
    exc_type: Optional[Type[BaseException]] = None,
    exc_val: Optional[BaseException] = None,
    exc_tb: Optional["TracebackType"] = None,
) -> None:
    """Closes the object."""
    self.running = False

    for h in self._subscribers.values():
        await h.close()

    if self._connection is not None:
        await self._close(exc_type, exc_val, exc_tb)

publish async #

publish(msg, *, producer, correlation_id=None, **kwargs)

Publish message directly.

Source code in faststream/broker/core/usecase.py
async def publish(
    self,
    msg: Any,
    *,
    producer: Optional["ProducerProto"],
    correlation_id: Optional[str] = None,
    **kwargs: Any,
) -> Optional[Any]:
    """Publish message directly."""
    assert producer, NOT_CONNECTED_YET  # nosec B101

    publish = producer.publish

    for m in self._middlewares[::-1]:
        publish = partial(m(None).publish_scope, publish)

    return await publish(msg, correlation_id=correlation_id, **kwargs)

request async #

request(msg, *, producer, correlation_id=None, **kwargs)

Publish message directly.

Source code in faststream/broker/core/usecase.py
async def request(
    self,
    msg: Any,
    *,
    producer: Optional["ProducerProto"],
    correlation_id: Optional[str] = None,
    **kwargs: Any,
) -> Any:
    """Publish message directly."""
    assert producer, NOT_CONNECTED_YET  # nosec B101

    request = producer.request
    for m in self._middlewares[::-1]:
        request = partial(m(None).publish_scope, request)

    published_msg = await request(
        msg,
        correlation_id=correlation_id,
        **kwargs,
    )

    async with AsyncExitStack() as stack:
        return_msg = return_input
        for m in self._middlewares[::-1]:
            mid = m(published_msg)
            await stack.enter_async_context(mid)
            return_msg = partial(mid.consume_scope, return_msg)

        parsed_msg: StreamMessage[Any] = await producer._parser(published_msg)
        parsed_msg._decoded_body = await producer._decoder(parsed_msg)
        parsed_msg._source_type = SourceType.Response
        return await return_msg(parsed_msg)

ping abstractmethod async #

ping(timeout)

Check connection alive.

Source code in faststream/broker/core/usecase.py
@abstractmethod
async def ping(self, timeout: Optional[float]) -> bool:
    """Check connection alive."""
    raise NotImplementedError()