Skip to content

RedisRoute

faststream.redis.RedisRoute #

RedisRoute(call, channel=None, *, publishers=(), list=None, stream=None, dependencies=(), parser=None, decoder=None, middlewares=(), filter=default_filter, retry=False, no_ack=False, no_reply=False, title=None, description=None, include_in_schema=True)

Bases: SubscriberRoute

Class to store delayed RedisBroker subscriber registration.

Source code in faststream/redis/router.py
def __init__(
    self,
    call: Annotated[
        Union[
            Callable[..., "SendableMessage"],
            Callable[..., Awaitable["SendableMessage"]],
        ],
        Doc(
            "Message handler function "
            "to wrap the same with `@broker.subscriber(...)` way."
        ),
    ],
    channel: Annotated[
        Union["PubSub", str, None],
        Doc("Redis PubSub object name to send message."),
    ] = None,
    *,
    publishers: Annotated[
        Iterable["RedisPublisher"],
        Doc("Redis publishers to broadcast the handler result."),
    ] = (),
    list: Annotated[
        Union["ListSub", str, None],
        Doc("Redis List object name to send message."),
    ] = None,
    stream: Annotated[
        Union["StreamSub", str, None],
        Doc("Redis Stream object name to send message."),
    ] = None,
    # broker arguments
    dependencies: Annotated[
        Iterable["Depends"],
        Doc("Dependencies list (`[Depends(),]`) to apply to the subscriber."),
    ] = (),
    parser: Annotated[
        Optional["CustomCallable"],
        Doc(
            "Parser to map original **aio_pika.IncomingMessage** Msg to FastStream one."
        ),
    ] = None,
    decoder: Annotated[
        Optional["CustomCallable"],
        Doc("Function to decode FastStream msg bytes body to python objects."),
    ] = None,
    middlewares: Annotated[
        Sequence["SubscriberMiddleware[UnifyRedisMessage]"],
        Doc("Subscriber middlewares to wrap incoming message processing."),
    ] = (),
    filter: Annotated[
        "Filter[UnifyRedisMessage]",
        Doc(
            "Overload subscriber to consume various messages from the same source."
        ),
        deprecated(
            "Deprecated in **FastStream 0.5.0**. "
            "Please, create `subscriber` object and use it explicitly instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = default_filter,
    retry: Annotated[
        bool,
        Doc("Whether to `nack` message at processing exception."),
    ] = False,
    no_ack: Annotated[
        bool,
        Doc("Whether to disable **FastStream** autoacknowledgement logic or not."),
    ] = False,
    no_reply: Annotated[
        bool,
        Doc(
            "Whether to disable **FastStream** RPC and Reply To auto responses or not."
        ),
    ] = False,
    # AsyncAPI information
    title: Annotated[
        Optional[str],
        Doc("AsyncAPI subscriber object title."),
    ] = None,
    description: Annotated[
        Optional[str],
        Doc(
            "AsyncAPI subscriber object description. "
            "Uses decorated docstring as default."
        ),
    ] = None,
    include_in_schema: Annotated[
        bool,
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = True,
) -> None:
    super().__init__(
        call,
        channel=channel,
        publishers=publishers,
        list=list,
        stream=stream,
        dependencies=dependencies,
        parser=parser,
        decoder=decoder,
        middlewares=middlewares,
        filter=filter,
        retry=retry,
        no_ack=no_ack,
        no_reply=no_reply,
        title=title,
        description=description,
        include_in_schema=include_in_schema,
    )

args instance-attribute #

args = args

kwargs instance-attribute #

kwargs = kwargs

call instance-attribute #

call = call

publishers instance-attribute #

publishers = publishers