Skip to content

RabbitRoute

faststream.rabbit.RabbitRoute #

RabbitRoute(call, queue, exchange=None, *, publishers=(), consume_args=None, reply_config=None, dependencies=(), parser=None, decoder=None, middlewares=(), filter=default_filter, retry=False, no_ack=False, no_reply=False, title=None, description=None, include_in_schema=True)

Bases: SubscriberRoute

Class to store delayed RabbitBroker subscriber registration.

Just a copy of RabbitRegistrator.subscriber(...) arguments.

Source code in faststream/rabbit/router.py
def __init__(
    self,
    call: Annotated[
        Union[
            Callable[..., "AioPikaSendableMessage"],
            Callable[..., Awaitable["AioPikaSendableMessage"]],
        ],
        Doc(
            "Message handler function "
            "to wrap the same with `@broker.subscriber(...)` way."
        ),
    ],
    queue: Annotated[
        Union[str, "RabbitQueue"],
        Doc(
            "RabbitMQ queue to listen. "
            "**FastStream** declares and binds queue object to `exchange` automatically if it is not passive (by default)."
        ),
    ],
    exchange: Annotated[
        Union[str, "RabbitExchange", None],
        Doc(
            "RabbitMQ exchange to bind queue to. "
            "Uses default exchange if not presented. "
            "**FastStream** declares exchange object automatically if it is not passive (by default)."
        ),
    ] = None,
    *,
    publishers: Annotated[
        Iterable[RabbitPublisher],
        Doc("RabbitMQ publishers to broadcast the handler result."),
    ] = (),
    consume_args: Annotated[
        Optional["AnyDict"],
        Doc("Extra consumer arguments to use in `queue.consume(...)` method."),
    ] = None,
    reply_config: Annotated[
        Optional["ReplyConfig"],
        Doc("Extra options to use at replies publishing."),
        deprecated(
            "Deprecated in **FastStream 0.5.16**. "
            "Please, use `RabbitResponse` object as a handler return instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = None,
    # broker arguments
    dependencies: Annotated[
        Iterable["Depends"],
        Doc("Dependencies list (`[Depends(),]`) to apply to the subscriber."),
    ] = (),
    parser: Annotated[
        Optional["CustomCallable"],
        Doc("Parser to map original **IncomingMessage** Msg to FastStream one."),
    ] = None,
    decoder: Annotated[
        Optional["CustomCallable"],
        Doc("Function to decode FastStream msg bytes body to python objects."),
    ] = None,
    middlewares: Annotated[
        Iterable["SubscriberMiddleware[RabbitMessage]"],
        Doc("Subscriber middlewares to wrap incoming message processing."),
    ] = (),
    filter: Annotated[
        "Filter[RabbitMessage]",
        Doc(
            "Overload subscriber to consume various messages from the same source."
        ),
        deprecated(
            "Deprecated in **FastStream 0.5.0**. "
            "Please, create `subscriber` object and use it explicitly instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = default_filter,
    retry: Annotated[
        Union[bool, int],
        Doc("Whether to `nack` message at processing exception."),
    ] = False,
    no_ack: Annotated[
        bool,
        Doc("Whether to disable **FastStream** autoacknowledgement logic or not."),
    ] = False,
    no_reply: Annotated[
        bool,
        Doc(
            "Whether to disable **FastStream** RPC and Reply To auto responses or not."
        ),
    ] = False,
    # AsyncAPI information
    title: Annotated[
        Optional[str],
        Doc("AsyncAPI subscriber object title."),
    ] = None,
    description: Annotated[
        Optional[str],
        Doc(
            "AsyncAPI subscriber object description. "
            "Uses decorated docstring as default."
        ),
    ] = None,
    include_in_schema: Annotated[
        bool,
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = True,
) -> None:
    super().__init__(
        call,
        publishers=publishers,
        queue=queue,
        exchange=exchange,
        consume_args=consume_args,
        reply_config=reply_config,
        dependencies=dependencies,
        parser=parser,
        decoder=decoder,
        middlewares=middlewares,
        filter=filter,
        retry=retry,
        no_ack=no_ack,
        no_reply=no_reply,
        title=title,
        description=description,
        include_in_schema=include_in_schema,
    )

args instance-attribute #

args = args

kwargs instance-attribute #

kwargs = kwargs

call instance-attribute #

call = call

publishers instance-attribute #

publishers = publishers