Skip to content

RedisRouter

faststream.redis.RedisRouter #

RedisRouter(prefix='', handlers=(), *, dependencies=(), middlewares=(), parser=None, decoder=None, include_in_schema=None)

Bases: RedisRegistrator, BrokerRouter[BaseMessage]

Includable to RedisBroker router.

Source code in faststream/redis/router.py
def __init__(
    self,
    prefix: Annotated[
        str,
        Doc("String prefix to add to all subscribers queues."),
    ] = "",
    handlers: Annotated[
        Iterable[RedisRoute],
        Doc("Route object to include."),
    ] = (),
    *,
    dependencies: Annotated[
        Iterable["Depends"],
        Doc(
            "Dependencies list (`[Depends(),]`) to apply to all routers' publishers/subscribers."
        ),
    ] = (),
    middlewares: Annotated[
        Iterable["BrokerMiddleware[BaseMessage]"],
        Doc("Router middlewares to apply to all routers' publishers/subscribers."),
    ] = (),
    parser: Annotated[
        Optional["CustomCallable"],
        Doc("Parser to map original **IncomingMessage** Msg to FastStream one."),
    ] = None,
    decoder: Annotated[
        Optional["CustomCallable"],
        Doc("Function to decode FastStream msg bytes body to python objects."),
    ] = None,
    include_in_schema: Annotated[
        Optional[bool],
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = None,
) -> None:
    super().__init__(
        handlers=handlers,
        # basic args
        prefix=prefix,
        dependencies=dependencies,
        middlewares=middlewares,
        parser=parser,
        decoder=decoder,
        include_in_schema=include_in_schema,
    )

prefix instance-attribute #

prefix = prefix

include_in_schema instance-attribute #

include_in_schema = include_in_schema

add_middleware #

add_middleware(middleware)

Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of already existed ones.

Source code in faststream/broker/core/abc.py
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.

    Current middleware will be used as a most inner of already existed ones.
    """
    self._middlewares = (*self._middlewares, middleware)

    for sub in self._subscribers.values():
        sub.add_middleware(middleware)

    for pub in self._publishers.values():
        pub.add_middleware(middleware)

subscriber #

subscriber(channel=None, *, list=None, stream=None, dependencies=(), parser=None, decoder=None, middlewares=(), filter=default_filter, retry=False, no_ack=False, no_reply=False, title=None, description=None, include_in_schema=True)
Source code in faststream/redis/broker/registrator.py
@override
def subscriber(  # type: ignore[override]
    self,
    channel: Annotated[
        Union["PubSub", str, None],
        Doc("Redis PubSub object name to send message."),
    ] = None,
    *,
    list: Annotated[
        Union["ListSub", str, None],
        Doc("Redis List object name to send message."),
    ] = None,
    stream: Annotated[
        Union["StreamSub", str, None],
        Doc("Redis Stream object name to send message."),
    ] = None,
    # broker arguments
    dependencies: Annotated[
        Iterable["Depends"],
        Doc("Dependencies list (`[Depends(),]`) to apply to the subscriber."),
    ] = (),
    parser: Annotated[
        Optional["CustomCallable"],
        Doc(
            "Parser to map original **aio_pika.IncomingMessage** Msg to FastStream one."
        ),
    ] = None,
    decoder: Annotated[
        Optional["CustomCallable"],
        Doc("Function to decode FastStream msg bytes body to python objects."),
    ] = None,
    middlewares: Annotated[
        Iterable["SubscriberMiddleware[UnifyRedisMessage]"],
        Doc("Subscriber middlewares to wrap incoming message processing."),
    ] = (),
    filter: Annotated[
        "Filter[UnifyRedisMessage]",
        Doc(
            "Overload subscriber to consume various messages from the same source."
        ),
        deprecated(
            "Deprecated in **FastStream 0.5.0**. "
            "Please, create `subscriber` object and use it explicitly instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = default_filter,
    retry: Annotated[
        bool,
        Doc("Whether to `nack` message at processing exception."),
    ] = False,
    no_ack: Annotated[
        bool,
        Doc("Whether to disable **FastStream** autoacknowledgement logic or not."),
    ] = False,
    no_reply: Annotated[
        bool,
        Doc(
            "Whether to disable **FastStream** RPC and Reply To auto responses or not."
        ),
    ] = False,
    # AsyncAPI information
    title: Annotated[
        Optional[str],
        Doc("AsyncAPI subscriber object title."),
    ] = None,
    description: Annotated[
        Optional[str],
        Doc(
            "AsyncAPI subscriber object description. "
            "Uses decorated docstring as default."
        ),
    ] = None,
    include_in_schema: Annotated[
        bool,
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = True,
) -> AsyncAPISubscriber:
    subscriber = cast(
        AsyncAPISubscriber,
        super().subscriber(
            create_subscriber(
                channel=channel,
                list=list,
                stream=stream,
                # subscriber args
                no_ack=no_ack,
                no_reply=no_reply,
                retry=retry,
                broker_middlewares=self._middlewares,
                broker_dependencies=self._dependencies,
                # AsyncAPI
                title_=title,
                description_=description,
                include_in_schema=self._solve_include_in_schema(include_in_schema),
            )
        ),
    )

    return subscriber.add_call(
        filter_=filter,
        parser_=parser or self._parser,
        decoder_=decoder or self._decoder,
        dependencies_=dependencies,
        middlewares_=middlewares,
    )

publisher #

publisher(channel=None, *, list=None, stream=None, headers=None, reply_to='', middlewares=(), title=None, description=None, schema=None, include_in_schema=True)

Creates long-living and AsyncAPI-documented publisher object.

You can use it as a handler decorator (handler should be decorated by @broker.subscriber(...) too) - @broker.publisher(...). In such case publisher will publish your handler return value.

Or you can create a publisher object to call it lately - broker.publisher(...).publish(...).

Source code in faststream/redis/broker/registrator.py
@override
def publisher(  # type: ignore[override]
    self,
    channel: Annotated[
        Union["PubSub", str, None],
        Doc("Redis PubSub object name to send message."),
    ] = None,
    *,
    list: Annotated[
        Union["ListSub", str, None],
        Doc("Redis List object name to send message."),
    ] = None,
    stream: Annotated[
        Union["StreamSub", str, None],
        Doc("Redis Stream object name to send message."),
    ] = None,
    headers: Annotated[
        Optional["AnyDict"],
        Doc(
            "Message headers to store metainformation. "
            "Can be overridden by `publish.headers` if specified."
        ),
    ] = None,
    reply_to: Annotated[
        str,
        Doc("Reply message destination PubSub object name."),
    ] = "",
    middlewares: Annotated[
        Iterable["PublisherMiddleware"],
        Doc("Publisher middlewares to wrap outgoing messages."),
    ] = (),
    # AsyncAPI information
    title: Annotated[
        Optional[str],
        Doc("AsyncAPI publisher object title."),
    ] = None,
    description: Annotated[
        Optional[str],
        Doc("AsyncAPI publisher object description."),
    ] = None,
    schema: Annotated[
        Optional[Any],
        Doc(
            "AsyncAPI publishing message type. "
            "Should be any python-native object annotation or `pydantic.BaseModel`."
        ),
    ] = None,
    include_in_schema: Annotated[
        bool,
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = True,
) -> AsyncAPIPublisher:
    """Creates long-living and AsyncAPI-documented publisher object.

    You can use it as a handler decorator (handler should be decorated by `@broker.subscriber(...)` too) - `@broker.publisher(...)`.
    In such case publisher will publish your handler return value.

    Or you can create a publisher object to call it lately - `broker.publisher(...).publish(...)`.
    """
    return cast(
        AsyncAPIPublisher,
        super().publisher(
            AsyncAPIPublisher.create(
                channel=channel,
                list=list,
                stream=stream,
                headers=headers,
                reply_to=reply_to,
                # Specific
                broker_middlewares=self._middlewares,
                middlewares=middlewares,
                # AsyncAPI
                title_=title,
                description_=description,
                schema_=schema,
                include_in_schema=self._solve_include_in_schema(include_in_schema),
            )
        ),
    )

include_router #

include_router(router, *, prefix='', dependencies=(), middlewares=(), include_in_schema=None)

Includes a router in the current object.

Source code in faststream/broker/core/abc.py
def include_router(
    self,
    router: "ABCBroker[Any]",
    *,
    prefix: str = "",
    dependencies: Iterable["Depends"] = (),
    middlewares: Iterable["BrokerMiddleware[MsgType]"] = (),
    include_in_schema: Optional[bool] = None,
) -> None:
    """Includes a router in the current object."""
    for h in router._subscribers.values():
        h.add_prefix("".join((self.prefix, prefix)))

        if (key := hash(h)) not in self._subscribers:
            if include_in_schema is None:
                h.include_in_schema = self._solve_include_in_schema(
                    h.include_in_schema
                )
            else:
                h.include_in_schema = include_in_schema

            h._broker_middlewares = (
                *self._middlewares,
                *middlewares,
                *h._broker_middlewares,
            )
            h._broker_dependencies = (
                *self._dependencies,
                *dependencies,
                *h._broker_dependencies,
            )
            self._subscribers = {**self._subscribers, key: h}

    for p in router._publishers.values():
        p.add_prefix(self.prefix)

        if (key := hash(p)) not in self._publishers:
            if include_in_schema is None:
                p.include_in_schema = self._solve_include_in_schema(
                    p.include_in_schema
                )
            else:
                p.include_in_schema = include_in_schema

            p._broker_middlewares = (
                *self._middlewares,
                *middlewares,
                *p._broker_middlewares,
            )
            self._publishers = {**self._publishers, key: p}

include_routers #

include_routers(*routers)

Includes routers in the object.

Source code in faststream/broker/core/abc.py
def include_routers(
    self,
    *routers: "ABCBroker[MsgType]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)