Skip to content

RedisRouter

faststream.redis.router.RedisRouter #

RedisRouter(prefix: str = '', handlers: Sequence[RedisRoute] = (), *, dependencies: Sequence[Depends] = (), parser: CustomParser[AnyRedisDict, RedisMessage] | None = None, decoder: CustomDecoder[RedisMessage] | None = None, middlewares: Sequence[Callable[[AnyRedisDict], BaseMiddleware]] | None = None, include_in_schema: bool = True)

Bases: RedisRouter

A class to represent a Redis router.

Source code in faststream/redis/router.py
) -> Publisher:
    if publisher.channel is not None:
        publisher.channel = model_copy(
            publisher.channel, update={"name": prefix + publisher.channel.name}
        )
    elif publisher.list is not None:
        publisher.list = model_copy(
            publisher.list, update={"name": prefix + publisher.list.name}
        )
    elif publisher.stream is not None:
        publisher.stream = model_copy(

include_in_schema instance-attribute #

include_in_schema = include_in_schema

prefix instance-attribute #

prefix: str = prefix

include_router #

include_router(router: BrokerRouter[PublisherKeyType, MsgType]) -> None

Includes a router in the current object.

PARAMETER DESCRIPTION
router

The router to be included.

TYPE: BrokerRouter[PublisherKeyType, MsgType]

RETURNS DESCRIPTION
None

None

Source code in faststream/broker/router.py
def include_router(self, router: "BrokerRouter[PublisherKeyType, MsgType]") -> None:
    """Includes a router in the current object.

    Args:
        router: The router to be included.

    Returns:
        None

    """
    for h in router._handlers:
        self.subscriber(*h.args, **h.kwargs)(h.call)

    for p in router._publishers.values():
        p = self._update_publisher_prefix(self.prefix, p)
        key = self._get_publisher_key(p)
        self._publishers[key] = self._publishers.get(key, p)

include_routers #

include_routers(*routers: BrokerRouter[PublisherKeyType, MsgType]) -> None

Includes routers in the object.

PARAMETER DESCRIPTION
*routers

Variable length argument list of routers to include.

TYPE: BrokerRouter[PublisherKeyType, MsgType] DEFAULT: ()

RETURNS DESCRIPTION
None

None

Source code in faststream/broker/router.py
def include_routers(
    self, *routers: "BrokerRouter[PublisherKeyType, MsgType]"
) -> None:
    """Includes routers in the object.

    Args:
        *routers: Variable length argument list of routers to include.

    Returns:
        None

    """
    for r in routers:
        self.include_router(r)

publisher #

publisher(channel: str | PubSub | None = None, list: str | ListSub | None = None, stream: str | StreamSub | None = None, headers: AnyDict | None = None, reply_to: str = '', title: str | None = None, description: str | None = None, schema: Any | None = None, include_in_schema: bool = True) -> Publisher
Source code in faststream/redis/router.py
@override
def publisher(  # type: ignore[override]
    self,
    channel: Union[str, PubSub, None] = None,
    list: Union[str, ListSub, None] = None,
    stream: Union[str, StreamSub, None] = None,
    headers: Optional[AnyDict] = None,
    reply_to: str = "",
    # AsyncAPI information
    title: Optional[str] = None,
    description: Optional[str] = None,
    schema: Optional[Any] = None,
    include_in_schema: bool = True,
) -> Publisher:
    if not any((stream, list, channel)):
        raise ValueError(INCORRECT_SETUP_MSG)

    new_publisher = self._update_publisher_prefix(
        self.prefix,
        Publisher(
            channel=PubSub.validate(channel),
            list=ListSub.validate(list),
            stream=StreamSub.validate(stream),
            reply_to=reply_to,
            headers=headers,
            title=title,
            _description=description,
            _schema=schema,
            include_in_schema=(
                include_in_schema
                if self.include_in_schema is None
                else self.include_in_schema
            ),
        ),
    )
    publisher_key = self._get_publisher_key(new_publisher)
    publisher = self._publishers[publisher_key] = self._publishers.get(
        publisher_key, new_publisher
    )
    return publisher

subscriber #

subscriber(channel: str | PubSub | None = None, *, list: str | ListSub | None = None, stream: str | StreamSub | None = None, dependencies: Sequence[Depends] = (), parser: CustomParser[AnyRedisDict, RedisMessage] | None = None, decoder: CustomDecoder[RedisMessage] | None = None, middlewares: Sequence[Callable[[AnyRedisDict], BaseMiddleware]] | None = None, filter: Filter[RedisMessage] = default_filter, no_ack: bool = False, title: str | None = None, description: str | None = None, include_in_schema: bool = True, **__service_kwargs: Any) -> Callable[[Callable[P_HandlerParams, T_HandlerReturn]], HandlerCallWrapper[Any, P_HandlerParams, T_HandlerReturn]]
Source code in faststream/redis/router.py
    list: Union[str, ListSub, None] = None,
    stream: Union[str, StreamSub, None] = None,
    headers: Optional[AnyDict] = None,
    reply_to: str = "",
    # AsyncAPI information
    title: Optional[str] = None,
    description: Optional[str] = None,
    schema: Optional[Any] = None,
    include_in_schema: bool = True,
) -> Publisher:
    if not any((stream, list, channel)):
        raise ValueError(INCORRECT_SETUP_MSG)

    new_publisher = self._update_publisher_prefix(
        self.prefix,
        Publisher(
            channel=PubSub.validate(channel),
            list=ListSub.validate(list),
            stream=StreamSub.validate(stream),
            reply_to=reply_to,
            headers=headers,
            title=title,
            _description=description,