Skip to content

RedisBroker

faststream.redis.RedisBroker #

RedisBroker(url='redis://localhost:6379', *, host=EMPTY, port=EMPTY, db=EMPTY, connection_class=EMPTY, client_name=None, health_check_interval=0, max_connections=None, socket_timeout=None, socket_connect_timeout=None, socket_read_size=65536, socket_keepalive=False, socket_keepalive_options=None, socket_type=0, retry_on_timeout=False, encoding='utf-8', encoding_errors='strict', decode_responses=False, parser_class=DefaultParser, encoder_class=Encoder, graceful_timeout=15.0, decoder=None, parser=None, dependencies=(), middlewares=(), security=None, asyncapi_url=None, protocol=None, protocol_version='custom', description=None, tags=None, logger=EMPTY, log_level=INFO, log_fmt=None, apply_types=True, validate=True, _get_dependant=None, _call_decorators=())

Bases: RedisRegistrator, RedisLoggingBroker

Redis broker.

Source code in faststream/redis/broker/broker.py
def __init__(
    self,
    url: str = "redis://localhost:6379",
    *,
    host: str = EMPTY,
    port: Union[str, int] = EMPTY,
    db: Union[str, int] = EMPTY,
    connection_class: Type["Connection"] = EMPTY,
    client_name: Optional[str] = None,
    health_check_interval: float = 0,
    max_connections: Optional[int] = None,
    socket_timeout: Optional[float] = None,
    socket_connect_timeout: Optional[float] = None,
    socket_read_size: int = 65536,
    socket_keepalive: bool = False,
    socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
    socket_type: int = 0,
    retry_on_timeout: bool = False,
    encoding: str = "utf-8",
    encoding_errors: str = "strict",
    decode_responses: bool = False,
    parser_class: Type["BaseParser"] = DefaultParser,
    encoder_class: Type["Encoder"] = Encoder,
    # broker args
    graceful_timeout: Annotated[
        Optional[float],
        Doc(
            "Graceful shutdown timeout. Broker waits for all running subscribers completion before shut down."
        ),
    ] = 15.0,
    decoder: Annotated[
        Optional["CustomCallable"],
        Doc("Custom decoder object."),
    ] = None,
    parser: Annotated[
        Optional["CustomCallable"],
        Doc("Custom parser object."),
    ] = None,
    dependencies: Annotated[
        Iterable["Depends"],
        Doc("Dependencies to apply to all broker subscribers."),
    ] = (),
    middlewares: Annotated[
        Sequence["BrokerMiddleware[BaseMessage]"],
        Doc("Middlewares to apply to all broker publishers/subscribers."),
    ] = (),
    # AsyncAPI args
    security: Annotated[
        Optional["BaseSecurity"],
        Doc(
            "Security options to connect broker and generate AsyncAPI server security information."
        ),
    ] = None,
    asyncapi_url: Annotated[
        Optional[str],
        Doc("AsyncAPI hardcoded server addresses. Use `servers` if not specified."),
    ] = None,
    protocol: Annotated[
        Optional[str],
        Doc("AsyncAPI server protocol."),
    ] = None,
    protocol_version: Annotated[
        Optional[str],
        Doc("AsyncAPI server protocol version."),
    ] = "custom",
    description: Annotated[
        Optional[str],
        Doc("AsyncAPI server description."),
    ] = None,
    tags: Annotated[
        Optional[Iterable[Union["asyncapi.Tag", "asyncapi.TagDict"]]],
        Doc("AsyncAPI server tags."),
    ] = None,
    # logging args
    logger: Annotated[
        Optional["LoggerProto"],
        Doc("User specified logger to pass into Context and log service messages."),
    ] = EMPTY,
    log_level: Annotated[
        int,
        Doc("Service messages log level."),
    ] = logging.INFO,
    log_fmt: Annotated[
        Optional[str],
        Doc("Default logger log format."),
    ] = None,
    # FastDepends args
    apply_types: Annotated[
        bool,
        Doc("Whether to use FastDepends or not."),
    ] = True,
    validate: Annotated[
        bool,
        Doc("Whether to cast types using Pydantic validation."),
    ] = True,
    _get_dependant: Annotated[
        Optional[Callable[..., Any]],
        Doc("Custom library dependant generator callback."),
    ] = None,
    _call_decorators: Annotated[
        Iterable["Decorator"],
        Doc("Any custom decorator to apply to wrapped functions."),
    ] = (),
) -> None:
    self._producer = None

    if asyncapi_url is None:
        asyncapi_url = url

    if protocol is None:
        url_kwargs = urlparse(asyncapi_url)
        protocol = url_kwargs.scheme

    super().__init__(
        url=url,
        host=host,
        port=port,
        db=db,
        client_name=client_name,
        health_check_interval=health_check_interval,
        max_connections=max_connections,
        socket_timeout=socket_timeout,
        socket_connect_timeout=socket_connect_timeout,
        socket_read_size=socket_read_size,
        socket_keepalive=socket_keepalive,
        socket_keepalive_options=socket_keepalive_options,
        socket_type=socket_type,
        retry_on_timeout=retry_on_timeout,
        encoding=encoding,
        encoding_errors=encoding_errors,
        decode_responses=decode_responses,
        parser_class=parser_class,
        connection_class=connection_class,
        encoder_class=encoder_class,
        # Basic args
        # broker base
        graceful_timeout=graceful_timeout,
        dependencies=dependencies,
        decoder=decoder,
        parser=parser,
        middlewares=middlewares,
        # AsyncAPI
        description=description,
        asyncapi_url=asyncapi_url,
        protocol=protocol,
        protocol_version=protocol_version,
        security=security,
        tags=tags,
        # logging
        logger=logger,
        log_level=log_level,
        log_fmt=log_fmt,
        # FastDepends args
        apply_types=apply_types,
        validate=validate,
        _get_dependant=_get_dependant,
        _call_decorators=_call_decorators,
    )

prefix instance-attribute #

prefix = prefix

include_in_schema instance-attribute #

include_in_schema = include_in_schema

logger instance-attribute #

logger

use_custom instance-attribute #

use_custom = True

running instance-attribute #

running = False

graceful_timeout instance-attribute #

graceful_timeout = graceful_timeout

protocol instance-attribute #

protocol = protocol

protocol_version instance-attribute #

protocol_version = protocol_version

description instance-attribute #

description = description

tags instance-attribute #

tags = tags

security instance-attribute #

security = security

url instance-attribute #

url

setup #

setup()

Prepare all Broker entities to startup.

Source code in faststream/broker/core/usecase.py
def setup(self) -> None:
    """Prepare all Broker entities to startup."""
    for h in self._subscribers.values():
        self.setup_subscriber(h)

    for p in self._publishers.values():
        self.setup_publisher(p)

add_middleware #

add_middleware(middleware)

Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of already existed ones.

Source code in faststream/broker/core/abc.py
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.

    Current middleware will be used as a most inner of already existed ones.
    """
    self._middlewares = (*self._middlewares, middleware)

    for sub in self._subscribers.values():
        sub.add_middleware(middleware)

    for pub in self._publishers.values():
        pub.add_middleware(middleware)

subscriber #

subscriber(channel=None, *, list=None, stream=None, dependencies=(), parser=None, decoder=None, middlewares=(), filter=default_filter, retry=False, no_ack=False, no_reply=False, title=None, description=None, include_in_schema=True)
Source code in faststream/redis/broker/registrator.py
@override
def subscriber(  # type: ignore[override]
    self,
    channel: Annotated[
        Union["PubSub", str, None],
        Doc("Redis PubSub object name to send message."),
    ] = None,
    *,
    list: Annotated[
        Union["ListSub", str, None],
        Doc("Redis List object name to send message."),
    ] = None,
    stream: Annotated[
        Union["StreamSub", str, None],
        Doc("Redis Stream object name to send message."),
    ] = None,
    # broker arguments
    dependencies: Annotated[
        Iterable["Depends"],
        Doc("Dependencies list (`[Depends(),]`) to apply to the subscriber."),
    ] = (),
    parser: Annotated[
        Optional["CustomCallable"],
        Doc(
            "Parser to map original **aio_pika.IncomingMessage** Msg to FastStream one."
        ),
    ] = None,
    decoder: Annotated[
        Optional["CustomCallable"],
        Doc("Function to decode FastStream msg bytes body to python objects."),
    ] = None,
    middlewares: Annotated[
        Sequence["SubscriberMiddleware[UnifyRedisMessage]"],
        Doc("Subscriber middlewares to wrap incoming message processing."),
    ] = (),
    filter: Annotated[
        "Filter[UnifyRedisMessage]",
        Doc(
            "Overload subscriber to consume various messages from the same source."
        ),
        deprecated(
            "Deprecated in **FastStream 0.5.0**. "
            "Please, create `subscriber` object and use it explicitly instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = default_filter,
    retry: Annotated[
        bool,
        Doc("Whether to `nack` message at processing exception."),
    ] = False,
    no_ack: Annotated[
        bool,
        Doc("Whether to disable **FastStream** autoacknowledgement logic or not."),
    ] = False,
    no_reply: Annotated[
        bool,
        Doc(
            "Whether to disable **FastStream** RPC and Reply To auto responses or not."
        ),
    ] = False,
    # AsyncAPI information
    title: Annotated[
        Optional[str],
        Doc("AsyncAPI subscriber object title."),
    ] = None,
    description: Annotated[
        Optional[str],
        Doc(
            "AsyncAPI subscriber object description. "
            "Uses decorated docstring as default."
        ),
    ] = None,
    include_in_schema: Annotated[
        bool,
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = True,
) -> AsyncAPISubscriber:
    subscriber = cast(
        AsyncAPISubscriber,
        super().subscriber(
            create_subscriber(
                channel=channel,
                list=list,
                stream=stream,
                # subscriber args
                no_ack=no_ack,
                no_reply=no_reply,
                retry=retry,
                broker_middlewares=self._middlewares,
                broker_dependencies=self._dependencies,
                # AsyncAPI
                title_=title,
                description_=description,
                include_in_schema=self._solve_include_in_schema(include_in_schema),
            )
        ),
    )

    return subscriber.add_call(
        filter_=filter,
        parser_=parser or self._parser,
        decoder_=decoder or self._decoder,
        dependencies_=dependencies,
        middlewares_=middlewares,
    )

publisher #

publisher(channel=None, *, list=None, stream=None, headers=None, reply_to='', middlewares=(), title=None, description=None, schema=None, include_in_schema=True)

Creates long-living and AsyncAPI-documented publisher object.

You can use it as a handler decorator (handler should be decorated by @broker.subscriber(...) too) - @broker.publisher(...). In such case publisher will publish your handler return value.

Or you can create a publisher object to call it lately - broker.publisher(...).publish(...).

Source code in faststream/redis/broker/registrator.py
@override
def publisher(  # type: ignore[override]
    self,
    channel: Annotated[
        Union["PubSub", str, None],
        Doc("Redis PubSub object name to send message."),
    ] = None,
    *,
    list: Annotated[
        Union["ListSub", str, None],
        Doc("Redis List object name to send message."),
    ] = None,
    stream: Annotated[
        Union["StreamSub", str, None],
        Doc("Redis Stream object name to send message."),
    ] = None,
    headers: Annotated[
        Optional["AnyDict"],
        Doc(
            "Message headers to store metainformation. "
            "Can be overridden by `publish.headers` if specified."
        ),
    ] = None,
    reply_to: Annotated[
        str,
        Doc("Reply message destination PubSub object name."),
    ] = "",
    middlewares: Annotated[
        Sequence["PublisherMiddleware"],
        Doc("Publisher middlewares to wrap outgoing messages."),
    ] = (),
    # AsyncAPI information
    title: Annotated[
        Optional[str],
        Doc("AsyncAPI publisher object title."),
    ] = None,
    description: Annotated[
        Optional[str],
        Doc("AsyncAPI publisher object description."),
    ] = None,
    schema: Annotated[
        Optional[Any],
        Doc(
            "AsyncAPI publishing message type. "
            "Should be any python-native object annotation or `pydantic.BaseModel`."
        ),
    ] = None,
    include_in_schema: Annotated[
        bool,
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = True,
) -> AsyncAPIPublisher:
    """Creates long-living and AsyncAPI-documented publisher object.

    You can use it as a handler decorator (handler should be decorated by `@broker.subscriber(...)` too) - `@broker.publisher(...)`.
    In such case publisher will publish your handler return value.

    Or you can create a publisher object to call it lately - `broker.publisher(...).publish(...)`.
    """
    return cast(
        AsyncAPIPublisher,
        super().publisher(
            AsyncAPIPublisher.create(
                channel=channel,
                list=list,
                stream=stream,
                headers=headers,
                reply_to=reply_to,
                # Specific
                broker_middlewares=self._middlewares,
                middlewares=middlewares,
                # AsyncAPI
                title_=title,
                description_=description,
                schema_=schema,
                include_in_schema=self._solve_include_in_schema(include_in_schema),
            )
        ),
    )

include_router #

include_router(router, *, prefix='', dependencies=(), middlewares=(), include_in_schema=None)

Includes a router in the current object.

Source code in faststream/broker/core/abc.py
def include_router(
    self,
    router: "ABCBroker[Any]",
    *,
    prefix: str = "",
    dependencies: Iterable["Depends"] = (),
    middlewares: Iterable["BrokerMiddleware[MsgType]"] = (),
    include_in_schema: Optional[bool] = None,
) -> None:
    """Includes a router in the current object."""
    for h in router._subscribers.values():
        h.add_prefix("".join((self.prefix, prefix)))

        if (key := hash(h)) not in self._subscribers:
            if include_in_schema is None:
                h.include_in_schema = self._solve_include_in_schema(
                    h.include_in_schema
                )
            else:
                h.include_in_schema = include_in_schema

            h._broker_middlewares = (
                *self._middlewares,
                *middlewares,
                *h._broker_middlewares,
            )
            h._broker_dependencies = (
                *self._dependencies,
                *dependencies,
                *h._broker_dependencies,
            )
            self._subscribers = {**self._subscribers, key: h}

    for p in router._publishers.values():
        p.add_prefix(self.prefix)

        if (key := hash(p)) not in self._publishers:
            if include_in_schema is None:
                p.include_in_schema = self._solve_include_in_schema(
                    p.include_in_schema
                )
            else:
                p.include_in_schema = include_in_schema

            p._broker_middlewares = (
                *self._middlewares,
                *middlewares,
                *p._broker_middlewares,
            )
            self._publishers = {**self._publishers, key: p}

include_routers #

include_routers(*routers)

Includes routers in the object.

Source code in faststream/broker/core/abc.py
def include_routers(
    self,
    *routers: "ABCBroker[MsgType]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)

get_fmt #

get_fmt()
Source code in faststream/redis/broker/logging.py
def get_fmt(self) -> str:
    return (
        "%(asctime)s %(levelname)-8s - "
        f"%(channel)-{self._max_channel_name}s | "
        f"%(message_id)-{self.__max_msg_id_ln}s "
        "- %(message)s"
    )

setup_subscriber #

setup_subscriber(subscriber, **kwargs)

Setup the Subscriber to prepare it to starting.

Source code in faststream/broker/core/usecase.py
def setup_subscriber(
    self,
    subscriber: SubscriberProto[MsgType],
    **kwargs: Any,
) -> None:
    """Setup the Subscriber to prepare it to starting."""
    data = self._subscriber_setup_extra.copy()
    data.update(kwargs)
    subscriber.setup(**data)

setup_publisher #

setup_publisher(publisher, **kwargs)

Setup the Publisher to prepare it to starting.

Source code in faststream/broker/core/usecase.py
def setup_publisher(
    self,
    publisher: "PublisherProto[MsgType]",
    **kwargs: Any,
) -> None:
    """Setup the Publisher to prepare it to starting."""
    data = self._publisher_setup_extra.copy()
    data.update(kwargs)
    publisher.setup(**data)

close async #

close(exc_type=None, exc_val=None, exc_tb=None)

Closes the object.

Source code in faststream/broker/core/usecase.py
async def close(
    self,
    exc_type: Optional[Type[BaseException]] = None,
    exc_val: Optional[BaseException] = None,
    exc_tb: Optional["TracebackType"] = None,
) -> None:
    """Closes the object."""
    self.running = False

    for h in self._subscribers.values():
        await h.close()

    if self._connection is not None:
        await self._close(exc_type, exc_val, exc_tb)

connect async #

connect(url=EMPTY, **kwargs)

Connect to the Redis server.

Source code in faststream/redis/broker/broker.py
@override
async def connect(  # type: ignore[override]
    self,
    url: Optional[str] = EMPTY,
    **kwargs: "Unpack[RedisInitKwargs]",
) -> "Redis[bytes]":
    """Connect to the Redis server."""
    if url is not EMPTY:
        connect_kwargs: AnyDict = {
            "url": url,
            **kwargs,
        }
    else:
        connect_kwargs = dict(kwargs).copy()

    return await super().connect(**connect_kwargs)

start async #

start()
Source code in faststream/redis/broker/broker.py
async def start(self) -> None:
    await super().start()

    for handler in self._subscribers.values():
        self._log(
            f"`{handler.call_name}` waiting for messages",
            extra=handler.get_log_context(None),
        )
        await handler.start()

publish async #

publish(message=None, channel=None, *, reply_to='', headers=None, correlation_id=None, list=None, stream=None, maxlen=None, rpc=False, rpc_timeout=30.0, raise_timeout=False)

Publish message directly.

This method allows you to publish message in not AsyncAPI-documented way. You can use it in another frameworks applications or to publish messages from time to time.

Please, use @broker.publisher(...) or broker.publisher(...).publish(...) instead in a regular way.

Source code in faststream/redis/broker/broker.py
@override
async def publish(  # type: ignore[override]
    self,
    message: Annotated[
        "SendableMessage",
        Doc("Message body to send."),
    ] = None,
    channel: Annotated[
        Optional[str],
        Doc("Redis PubSub object name to send message."),
    ] = None,
    *,
    reply_to: Annotated[
        str,
        Doc("Reply message destination PubSub object name."),
    ] = "",
    headers: Annotated[
        Optional["AnyDict"],
        Doc("Message headers to store metainformation."),
    ] = None,
    correlation_id: Annotated[
        Optional[str],
        Doc(
            "Manual message **correlation_id** setter. "
            "**correlation_id** is a useful option to trace messages."
        ),
    ] = None,
    list: Annotated[
        Optional[str],
        Doc("Redis List object name to send message."),
    ] = None,
    stream: Annotated[
        Optional[str],
        Doc("Redis Stream object name to send message."),
    ] = None,
    maxlen: Annotated[
        Optional[int],
        Doc(
            "Redis Stream maxlen publish option. "
            "Remove eldest message if maxlen exceeded."
        ),
    ] = None,
    rpc: Annotated[
        bool,
        Doc("Whether to wait for reply in blocking mode."),
        deprecated(
            "Deprecated in **FastStream 0.5.17**. "
            "Please, use `request` method instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = False,
    rpc_timeout: Annotated[
        Optional[float],
        Doc("RPC reply waiting time."),
        deprecated(
            "Deprecated in **FastStream 0.5.17**. "
            "Please, use `request` method with `timeout` instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = 30.0,
    raise_timeout: Annotated[
        bool,
        Doc(
            "Whetever to raise `TimeoutError` or return `None` at **rpc_timeout**. "
            "RPC request returns `None` at timeout by default."
        ),
        deprecated(
            "Deprecated in **FastStream 0.5.17**. "
            "`request` always raises TimeoutError instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = False,
) -> Optional["DecodedMessage"]:
    """Publish message directly.

    This method allows you to publish message in not AsyncAPI-documented way. You can use it in another frameworks
    applications or to publish messages from time to time.

    Please, use `@broker.publisher(...)` or `broker.publisher(...).publish(...)` instead in a regular way.
    """
    return await super().publish(
        message,
        producer=self._producer,
        correlation_id=correlation_id or gen_cor_id(),
        channel=channel,
        list=list,
        stream=stream,
        maxlen=maxlen,
        reply_to=reply_to,
        headers=headers,
        rpc=rpc,
        rpc_timeout=rpc_timeout,
        raise_timeout=raise_timeout,
    )

request async #

request(message, channel=None, *, list=None, stream=None, maxlen=None, correlation_id=None, headers=None, timeout=30.0)
Source code in faststream/redis/broker/broker.py
@override
async def request(  # type: ignore[override]
    self,
    message: "SendableMessage",
    channel: Optional[str] = None,
    *,
    list: Optional[str] = None,
    stream: Optional[str] = None,
    maxlen: Optional[int] = None,
    correlation_id: Optional[str] = None,
    headers: Optional["AnyDict"] = None,
    timeout: Optional[float] = 30.0,
) -> "RedisMessage":
    msg: RedisMessage = await super().request(
        message,
        producer=self._producer,
        correlation_id=correlation_id or gen_cor_id(),
        channel=channel,
        list=list,
        stream=stream,
        maxlen=maxlen,
        headers=headers,
        timeout=timeout,
    )
    return msg

publish_batch async #

publish_batch(*msgs, list, correlation_id=None)

Publish multiple messages to Redis List by one request.

Source code in faststream/redis/broker/broker.py
async def publish_batch(
    self,
    *msgs: Annotated[
        "SendableMessage",
        Doc("Messages bodies to send."),
    ],
    list: Annotated[
        str,
        Doc("Redis List object name to send messages."),
    ],
    correlation_id: Annotated[
        Optional[str],
        Doc(
            "Manual message **correlation_id** setter. "
            "**correlation_id** is a useful option to trace messages."
        ),
    ] = None,
) -> None:
    """Publish multiple messages to Redis List by one request."""
    assert self._producer, NOT_CONNECTED_YET  # nosec B101

    correlation_id = correlation_id or gen_cor_id()

    call: AsyncFunc = self._producer.publish_batch

    for m in self._middlewares[::-1]:
        call = partial(m(None).publish_scope, call)

    await call(
        *msgs,
        list=list,
        correlation_id=correlation_id,
    )

ping async #

ping(timeout)
Source code in faststream/redis/broker/broker.py
@override
async def ping(self, timeout: Optional[float]) -> bool:
    sleep_time = (timeout or 10) / 10

    with move_on_after(timeout) as cancel_scope:
        if self._connection is None:
            return False

        while True:
            if cancel_scope.cancel_called:
                return False

            try:
                if await self._connection.ping():
                    return True

            except ConnectionError:
                pass

            await anyio.sleep(sleep_time)

    return False