Skip to content

RedisBroker

faststream.redis.broker.RedisBroker #

RedisBroker(
    url: str = "redis://localhost:6379",
    polling_interval: float | None = None,
    *,
    protocol: str | None = None,
    protocol_version: str | None = "custom",
    security: BaseSecurity | None = None,
    **kwargs: Any
)

Bases: RedisLoggingMixin, BrokerAsyncUsecase[AnyRedisDict, 'Redis[bytes]']

Redis broker.

Redis broker.

PARAMETER DESCRIPTION
url

URL of the Redis server

DEFAULT: 'redis://localhost:6379'

polling_interval

interval in seconds to poll the Redis server for new messages (default: None)

DEFAULT: None

protocol

protocol of the Redis server (default: None)

DEFAULT: None

protocol_version

protocol version of the Redis server (default: "custom")

DEFAULT: 'custom'

security

security settings for the Redis server (default: None)

DEFAULT: None

kwargs

additional keyword arguments

DEFAULT: {}

Source code in faststream/redis/broker.py
def __init__(
    self,
    url: str = "redis://localhost:6379",
    polling_interval: Optional[float] = None,
    *,
    protocol: Optional[str] = None,
    protocol_version: Optional[str] = "custom",
    security: Optional[BaseSecurity] = None,
    **kwargs: Any,
) -> None:
    """Redis broker.

    Args:
        url : URL of the Redis server
        polling_interval : interval in seconds to poll the Redis server for new messages (default: None)
        protocol : protocol of the Redis server (default: None)
        protocol_version : protocol version of the Redis server (default: "custom")
        security : security settings for the Redis server (default: None)
        kwargs : additional keyword arguments
    """
    self.global_polling_interval = polling_interval
    self._producer = None

    super().__init__(
        url=url,
        protocol_version=protocol_version,
        security=security,
        **kwargs,
    )

    url_kwargs = urlparse(self.url)
    self.protocol = protocol or url_kwargs.scheme

dependencies instance-attribute #

dependencies: Sequence[Depends] = dependencies

description instance-attribute #

description = description

fmt property #

fmt: str

global_polling_interval instance-attribute #

global_polling_interval = polling_interval

graceful_timeout instance-attribute #

graceful_timeout = graceful_timeout

handlers instance-attribute #

handlers: dict[int, Handler]

log_level instance-attribute #

log_level: int

logger instance-attribute #

logger: Optional[Logger]

middlewares instance-attribute #

middlewares: Sequence[Callable[[MsgType], BaseMiddleware]]

protocol instance-attribute #

protocol = protocol or scheme

protocol_version instance-attribute #

protocol_version = protocol_version

security instance-attribute #

security = security

started instance-attribute #

started: bool = False

tags instance-attribute #

tags = tags

url instance-attribute #

url: str

close async #

close(
    exc_type: Optional[Type[BaseException]] = None,
    exc_val: Optional[BaseException] = None,
    exec_tb: Optional[TracebackType] = None,
) -> None

Closes the object.

PARAMETER DESCRIPTION
exc_type

The type of the exception being handled, if any.

TYPE: Optional[Type[BaseException]] DEFAULT: None

exc_val

The exception instance being handled, if any.

TYPE: Optional[BaseException] DEFAULT: None

exec_tb

The traceback of the exception being handled, if any.

TYPE: Optional[TracebackType] DEFAULT: None

RETURNS DESCRIPTION
None

None

RAISES DESCRIPTION
NotImplementedError

If the method is not implemented.

Source code in faststream/broker/core/asynchronous.py
async def close(
    self,
    exc_type: Optional[Type[BaseException]] = None,
    exc_val: Optional[BaseException] = None,
    exec_tb: Optional[TracebackType] = None,
) -> None:
    """Closes the object.

    Args:
        exc_type: The type of the exception being handled, if any.
        exc_val: The exception instance being handled, if any.
        exec_tb: The traceback of the exception being handled, if any.

    Returns:
        None

    Raises:
        NotImplementedError: If the method is not implemented.

    """
    super()._abc_close(exc_type, exc_val, exec_tb)

    for h in self.handlers.values():
        await h.close()

    if self._connection is not None:
        await self._close(exc_type, exc_val, exec_tb)

connect async #

connect(*args: Any, **kwargs: Any) -> Redis[bytes]

Connect to the Redis server.

PARAMETER DESCRIPTION
args

additional positional arguments

DEFAULT: ()

kwargs

additional keyword arguments

DEFAULT: {}

Source code in faststream/redis/broker.py
async def connect(
    self,
    *args: Any,
    **kwargs: Any,
) -> "Redis[bytes]":
    """Connect to the Redis server.

    Args:
        args : additional positional arguments
        kwargs : additional keyword arguments
    """
    connection = await super().connect(*args, **kwargs)
    for p in self._publishers.values():
        p._producer = self._producer
    return connection

include_router #

include_router(router: BrokerRouter[Any, MsgType]) -> None

Includes a router in the current object.

PARAMETER DESCRIPTION
router

The router to be included.

TYPE: BrokerRouter[Any, MsgType]

RETURNS DESCRIPTION
None

None

Source code in faststream/broker/core/abc.py
def include_router(self, router: BrokerRouter[Any, MsgType]) -> None:
    """Includes a router in the current object.

    Args:
        router: The router to be included.

    Returns:
        None

    """
    for r in router._handlers:
        self.subscriber(*r.args, **r.kwargs)(r.call)

    self._publishers = {**self._publishers, **router._publishers}

include_routers #

include_routers(
    *routers: BrokerRouter[Any, MsgType]
) -> None

Includes routers in the current object.

PARAMETER DESCRIPTION
*routers

Variable length argument list of routers to include.

TYPE: BrokerRouter[Any, MsgType] DEFAULT: ()

RETURNS DESCRIPTION
None

None

Source code in faststream/broker/core/abc.py
def include_routers(self, *routers: BrokerRouter[Any, MsgType]) -> None:
    """Includes routers in the current object.

    Args:
        *routers: Variable length argument list of routers to include.

    Returns:
        None

    """
    for r in routers:
        self.include_router(r)

publish async #

publish(*args: Any, **kwargs: Any) -> DecodedMessage | None
Source code in faststream/redis/broker.py
@override
async def publish(  # type: ignore[override]
    self,
    *args: Any,
    **kwargs: Any,
) -> Optional[DecodedMessage]:
    assert self._producer, NOT_CONNECTED_YET  # nosec B101
    return await self._producer.publish(*args, **kwargs)

publish_batch async #

publish_batch(*args: Any, **kwargs: Any) -> None
Source code in faststream/redis/broker.py
async def publish_batch(
    self,
    *args: Any,
    **kwargs: Any,
) -> None:
    assert self._producer, NOT_CONNECTED_YET  # nosec B101
    return await self._producer.publish_batch(*args, **kwargs)

publisher #

publisher(
    channel: Channel | PubSub | None = None,
    list: Channel | ListSub | None = None,
    stream: Channel | StreamSub | None = None,
    headers: AnyDict | None = None,
    reply_to: str = "",
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> Publisher
Source code in faststream/redis/broker.py
@override
def publisher(  # type: ignore[override]
    self,
    channel: Union[Channel, PubSub, None] = None,
    list: Union[Channel, ListSub, None] = None,
    stream: Union[Channel, StreamSub, None] = None,
    headers: Optional[AnyDict] = None,
    reply_to: str = "",
    # AsyncAPI information
    title: Optional[str] = None,
    description: Optional[str] = None,
    schema: Optional[Any] = None,
    include_in_schema: bool = True,
) -> Publisher:
    channel = PubSub.validate(channel)
    list = ListSub.validate(list)
    stream = StreamSub.validate(stream)

    any_of = channel or list or stream
    if any_of is None:
        raise ValueError(INCORRECT_SETUP_MSG)

    key = Handler.get_routing_hash(any_of)
    publisher = self._publishers.get(
        key,
        Publisher(
            channel=channel,
            list=list,
            stream=stream,
            headers=headers,
            reply_to=reply_to,
            # AsyncAPI
            title=title,
            _description=description,
            _schema=schema,
            include_in_schema=include_in_schema,
        ),
    )
    super().publisher(key, publisher)
    if self._producer is not None:
        publisher._producer = self._producer
    return publisher

start async #

start() -> None
Source code in faststream/redis/broker.py
async def start(self) -> None:
    context.set_global(
        "default_log_context",
        self._get_log_context(None, ""),
    )

    await super().start()
    assert self._connection, NOT_CONNECTED_YET  # nosec B101

    for handler in self.handlers.values():
        if (stream := handler.stream_sub) is not None and stream.group:
            try:
                await self._connection.xgroup_create(
                    name=stream.name,
                    groupname=stream.group,
                    mkstream=True,
                )
            except ResponseError as e:
                if "already exists" not in str(e):
                    raise e

        c = self._get_log_context(None, handler.channel_name)
        self._log(f"`{handler.call_name}` waiting for messages", extra=c)
        await handler.start(self._connection)

subscriber #

subscriber(
    channel: Channel | PubSub | None = None,
    *,
    list: Channel | ListSub | None = None,
    stream: Channel | StreamSub | None = None,
    dependencies: Sequence[Depends] = (),
    parser: CustomParser[AnyRedisDict, RedisMessage]
    | None = None,
    decoder: CustomDecoder[RedisMessage] | None = None,
    middlewares: Sequence[
        Callable[[AnyRedisDict], BaseMiddleware]
    ]
    | None = None,
    filter: Filter[RedisMessage] = default_filter,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    **original_kwargs: Any
) -> Callable[
    [Callable[P_HandlerParams, T_HandlerReturn]],
    HandlerCallWrapper[
        Any, P_HandlerParams, T_HandlerReturn
    ],
]
Source code in faststream/redis/broker.py
@override
def subscriber(  # type: ignore[override]
    self,
    channel: Union[Channel, PubSub, None] = None,
    *,
    list: Union[Channel, ListSub, None] = None,
    stream: Union[Channel, StreamSub, None] = None,
    # broker arguments
    dependencies: Sequence[Depends] = (),
    parser: Optional[CustomParser[AnyRedisDict, RedisMessage]] = None,
    decoder: Optional[CustomDecoder[RedisMessage]] = None,
    middlewares: Optional[
        Sequence[Callable[[AnyRedisDict], BaseMiddleware]]
    ] = None,
    filter: Filter[RedisMessage] = default_filter,
    # AsyncAPI information
    title: Optional[str] = None,
    description: Optional[str] = None,
    include_in_schema: bool = True,
    **original_kwargs: Any,
) -> Callable[
    [Callable[P_HandlerParams, T_HandlerReturn]],
    HandlerCallWrapper[Any, P_HandlerParams, T_HandlerReturn],
]:
    channel = PubSub.validate(channel)
    list = ListSub.validate(list)
    stream = StreamSub.validate(stream)

    if (any_of := channel or list or stream) is None:
        raise ValueError(INCORRECT_SETUP_MSG)

    if all((channel, list)):
        raise ValueError("You can't use `PubSub` and `ListSub` both")
    elif all((channel, stream)):
        raise ValueError("You can't use `PubSub` and `StreamSub` both")
    elif all((list, stream)):
        raise ValueError("You can't use `ListSub` and `StreamSub` both")

    self._setup_log_context(channel=any_of.name)
    super().subscriber()

    key = Handler.get_routing_hash(any_of)
    handler = self.handlers[key] = self.handlers.get(
        key,
        Handler(  # type: ignore[abstract]
            log_context_builder=partial(
                self._get_log_context,
                channel=any_of.name,
            ),
            graceful_timeout=self.graceful_timeout,
            # Redis
            channel=channel,
            list=list,
            stream=stream,
            # AsyncAPI
            title=title,
            description=description,
            include_in_schema=include_in_schema,
        ),
    )

    def consumer_wrapper(
        func: Callable[P_HandlerParams, T_HandlerReturn],
    ) -> HandlerCallWrapper[
        AnyRedisDict,
        P_HandlerParams,
        T_HandlerReturn,
    ]:
        handler_call, dependant = self._wrap_handler(
            func,
            extra_dependencies=dependencies,
            **original_kwargs,
        )

        handler.add_call(
            handler=handler_call,
            filter=filter,
            middlewares=middlewares,
            parser=parser or self._global_parser,  # type: ignore[arg-type]
            decoder=decoder or self._global_decoder,  # type: ignore[arg-type]
            dependant=dependant,
        )

        return handler_call

    return consumer_wrapper