Skip to content

RabbitRouter

faststream.rabbit.router.RabbitRouter #

RabbitRouter(
    prefix="",
    handlers=(),
    *,
    dependencies=(),
    middlewares=(),
    parser=None,
    decoder=None,
    include_in_schema=None,
)

Bases: RabbitRegistrator, BrokerRouter['IncomingMessage']

Includable to RabbitBroker router.

Source code in faststream/rabbit/router.py
def __init__(
    self,
    prefix: Annotated[
        str,
        Doc("String prefix to add to all subscribers queues."),
    ] = "",
    handlers: Annotated[
        Iterable[RabbitRoute],
        Doc("Route object to include."),
    ] = (),
    *,
    dependencies: Annotated[
        Iterable["Depends"],
        Doc(
            "Dependencies list (`[Depends(),]`) to apply to all routers' publishers/subscribers."
        ),
    ] = (),
    middlewares: Annotated[
        Sequence["BrokerMiddleware[IncomingMessage]"],
        Doc("Router middlewares to apply to all routers' publishers/subscribers."),
    ] = (),
    parser: Annotated[
        Optional["CustomCallable"],
        Doc("Parser to map original **IncomingMessage** Msg to FastStream one."),
    ] = None,
    decoder: Annotated[
        Optional["CustomCallable"],
        Doc("Function to decode FastStream msg bytes body to python objects."),
    ] = None,
    include_in_schema: Annotated[
        Optional[bool],
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = None,
) -> None:
    super().__init__(
        handlers=handlers,
        # basic args
        prefix=prefix,
        dependencies=dependencies,
        middlewares=middlewares,
        parser=parser,
        decoder=decoder,
        include_in_schema=include_in_schema,
    )

prefix instance-attribute #

prefix = prefix

include_in_schema instance-attribute #

include_in_schema = include_in_schema

add_middleware #

add_middleware(middleware)

Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of already existed ones.

Source code in faststream/broker/core/abc.py
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.

    Current middleware will be used as a most inner of already existed ones.
    """
    self._middlewares = (*self._middlewares, middleware)

    for sub in self._subscribers.values():
        sub.add_middleware(middleware)

    for pub in self._publishers.values():
        pub.add_middleware(middleware)

subscriber #

subscriber(
    queue,
    exchange=None,
    *,
    consume_args=None,
    reply_config=None,
    dependencies=(),
    parser=None,
    decoder=None,
    middlewares=(),
    filter=default_filter,
    retry=False,
    no_ack=False,
    no_reply=False,
    title=None,
    description=None,
    include_in_schema=True,
)
Source code in faststream/rabbit/broker/registrator.py
@override
def subscriber(  # type: ignore[override]
    self,
    queue: Annotated[
        Union[str, "RabbitQueue"],
        Doc(
            "RabbitMQ queue to listen. "
            "**FastStream** declares and binds queue object to `exchange` automatically if it is not passive (by default)."
        ),
    ],
    exchange: Annotated[
        Union[str, "RabbitExchange", None],
        Doc(
            "RabbitMQ exchange to bind queue to. "
            "Uses default exchange if not presented. "
            "**FastStream** declares exchange object automatically if it is not passive (by default)."
        ),
    ] = None,
    *,
    consume_args: Annotated[
        Optional["AnyDict"],
        Doc("Extra consumer arguments to use in `queue.consume(...)` method."),
    ] = None,
    reply_config: Annotated[
        Optional["ReplyConfig"],
        Doc("Extra options to use at replies publishing."),
        deprecated(
            "Deprecated in **FastStream 0.5.16**. "
            "Please, use `RabbitResponse` object as a handler return instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = None,
    # broker arguments
    dependencies: Annotated[
        Iterable["Depends"],
        Doc("Dependencies list (`[Depends(),]`) to apply to the subscriber."),
    ] = (),
    parser: Annotated[
        Optional["CustomCallable"],
        Doc("Parser to map original **IncomingMessage** Msg to FastStream one."),
    ] = None,
    decoder: Annotated[
        Optional["CustomCallable"],
        Doc("Function to decode FastStream msg bytes body to python objects."),
    ] = None,
    middlewares: Annotated[
        Sequence["SubscriberMiddleware[RabbitMessage]"],
        Doc("Subscriber middlewares to wrap incoming message processing."),
    ] = (),
    filter: Annotated[
        "Filter[RabbitMessage]",
        Doc(
            "Overload subscriber to consume various messages from the same source."
        ),
        deprecated(
            "Deprecated in **FastStream 0.5.0**. "
            "Please, create `subscriber` object and use it explicitly instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = default_filter,
    retry: Annotated[
        Union[bool, int],
        Doc("Whether to `nack` message at processing exception."),
    ] = False,
    no_ack: Annotated[
        bool,
        Doc("Whether to disable **FastStream** autoacknowledgement logic or not."),
    ] = False,
    no_reply: Annotated[
        bool,
        Doc(
            "Whether to disable **FastStream** RPC and Reply To auto responses or not."
        ),
    ] = False,
    # AsyncAPI information
    title: Annotated[
        Optional[str],
        Doc("AsyncAPI subscriber object title."),
    ] = None,
    description: Annotated[
        Optional[str],
        Doc(
            "AsyncAPI subscriber object description. "
            "Uses decorated docstring as default."
        ),
    ] = None,
    include_in_schema: Annotated[
        bool,
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = True,
) -> AsyncAPISubscriber:
    subscriber = cast(
        AsyncAPISubscriber,
        super().subscriber(
            create_subscriber(
                queue=RabbitQueue.validate(queue),
                exchange=RabbitExchange.validate(exchange),
                consume_args=consume_args,
                reply_config=reply_config,
                # subscriber args
                no_ack=no_ack,
                no_reply=no_reply,
                retry=retry,
                broker_middlewares=self._middlewares,
                broker_dependencies=self._dependencies,
                # AsyncAPI
                title_=title,
                description_=description,
                include_in_schema=self._solve_include_in_schema(include_in_schema),
            )
        ),
    )

    return subscriber.add_call(
        filter_=filter,
        parser_=parser or self._parser,
        decoder_=decoder or self._decoder,
        dependencies_=dependencies,
        middlewares_=middlewares,
    )

publisher #

publisher(
    queue="",
    exchange=None,
    *,
    routing_key="",
    mandatory=True,
    immediate=False,
    timeout=None,
    persist=False,
    reply_to=None,
    priority=None,
    middlewares=(),
    title=None,
    description=None,
    schema=None,
    include_in_schema=True,
    headers=None,
    content_type=None,
    content_encoding=None,
    expiration=None,
    message_type=None,
    user_id=None,
)

Creates long-living and AsyncAPI-documented publisher object.

You can use it as a handler decorator (handler should be decorated by @broker.subscriber(...) too) - @broker.publisher(...). In such case publisher will publish your handler return value.

Or you can create a publisher object to call it lately - broker.publisher(...).publish(...).

Source code in faststream/rabbit/broker/registrator.py
@override
def publisher(  # type: ignore[override]
    self,
    queue: Annotated[
        Union["RabbitQueue", str],
        Doc("Default message routing key to publish with."),
    ] = "",
    exchange: Annotated[
        Union["RabbitExchange", str, None],
        Doc("Target exchange to publish message to."),
    ] = None,
    *,
    routing_key: Annotated[
        str,
        Doc(
            "Default message routing key to publish with. "
            "Overrides `queue` option if presented."
        ),
    ] = "",
    mandatory: Annotated[
        bool,
        Doc(
            "Client waits for confirmation that the message is placed to some queue. "
            "RabbitMQ returns message to client if there is no suitable queue."
        ),
    ] = True,
    immediate: Annotated[
        bool,
        Doc(
            "Client expects that there is consumer ready to take the message to work. "
            "RabbitMQ returns message to client if there is no suitable consumer."
        ),
    ] = False,
    timeout: Annotated[
        "TimeoutType",
        Doc("Send confirmation time from RabbitMQ."),
    ] = None,
    persist: Annotated[
        bool,
        Doc("Restore the message on RabbitMQ reboot."),
    ] = False,
    reply_to: Annotated[
        Optional[str],
        Doc(
            "Reply message routing key to send with (always sending to default exchange)."
        ),
    ] = None,
    priority: Annotated[
        Optional[int],
        Doc("The message priority (0 by default)."),
    ] = None,
    # specific
    middlewares: Annotated[
        Sequence["PublisherMiddleware"],
        Doc("Publisher middlewares to wrap outgoing messages."),
    ] = (),
    # AsyncAPI information
    title: Annotated[
        Optional[str],
        Doc("AsyncAPI publisher object title."),
    ] = None,
    description: Annotated[
        Optional[str],
        Doc("AsyncAPI publisher object description."),
    ] = None,
    schema: Annotated[
        Optional[Any],
        Doc(
            "AsyncAPI publishing message type. "
            "Should be any python-native object annotation or `pydantic.BaseModel`."
        ),
    ] = None,
    include_in_schema: Annotated[
        bool,
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = True,
    # message args
    headers: Annotated[
        Optional["HeadersType"],
        Doc(
            "Message headers to store metainformation. "
            "Can be overridden by `publish.headers` if specified."
        ),
    ] = None,
    content_type: Annotated[
        Optional[str],
        Doc(
            "Message **content-type** header. "
            "Used by application, not core RabbitMQ. "
            "Will be set automatically if not specified."
        ),
    ] = None,
    content_encoding: Annotated[
        Optional[str],
        Doc("Message body content encoding, e.g. **gzip**."),
    ] = None,
    expiration: Annotated[
        Optional["DateType"],
        Doc("Message expiration (lifetime) in seconds (or datetime or timedelta)."),
    ] = None,
    message_type: Annotated[
        Optional[str],
        Doc("Application-specific message type, e.g. **orders.created**."),
    ] = None,
    user_id: Annotated[
        Optional[str],
        Doc("Publisher connection User ID, validated if set."),
    ] = None,
) -> AsyncAPIPublisher:
    """Creates long-living and AsyncAPI-documented publisher object.

    You can use it as a handler decorator (handler should be decorated by `@broker.subscriber(...)` too) - `@broker.publisher(...)`.
    In such case publisher will publish your handler return value.

    Or you can create a publisher object to call it lately - `broker.publisher(...).publish(...)`.
    """
    message_kwargs = PublishKwargs(
        mandatory=mandatory,
        immediate=immediate,
        timeout=timeout,
        persist=persist,
        reply_to=reply_to,
        headers=headers,
        priority=priority,
        content_type=content_type,
        content_encoding=content_encoding,
        message_type=message_type,
        user_id=user_id,
        expiration=expiration,
    )

    publisher = cast(
        AsyncAPIPublisher,
        super().publisher(
            AsyncAPIPublisher.create(
                routing_key=routing_key,
                queue=RabbitQueue.validate(queue),
                exchange=RabbitExchange.validate(exchange),
                message_kwargs=message_kwargs,
                # Specific
                broker_middlewares=self._middlewares,
                middlewares=middlewares,
                # AsyncAPI
                title_=title,
                description_=description,
                schema_=schema,
                include_in_schema=self._solve_include_in_schema(include_in_schema),
            )
        ),
    )

    return publisher

include_router #

include_router(
    router,
    *,
    prefix="",
    dependencies=(),
    middlewares=(),
    include_in_schema=None,
)

Includes a router in the current object.

Source code in faststream/broker/core/abc.py
def include_router(
    self,
    router: "ABCBroker[Any]",
    *,
    prefix: str = "",
    dependencies: Iterable["Depends"] = (),
    middlewares: Iterable["BrokerMiddleware[MsgType]"] = (),
    include_in_schema: Optional[bool] = None,
) -> None:
    """Includes a router in the current object."""
    for h in router._subscribers.values():
        h.add_prefix("".join((self.prefix, prefix)))

        if (key := hash(h)) not in self._subscribers:
            if include_in_schema is None:
                h.include_in_schema = self._solve_include_in_schema(
                    h.include_in_schema
                )
            else:
                h.include_in_schema = include_in_schema

            h._broker_middlewares = (
                *self._middlewares,
                *middlewares,
                *h._broker_middlewares,
            )
            h._broker_dependencies = (
                *self._dependencies,
                *dependencies,
                *h._broker_dependencies,
            )
            self._subscribers = {**self._subscribers, key: h}

    for p in router._publishers.values():
        p.add_prefix(self.prefix)

        if (key := hash(p)) not in self._publishers:
            if include_in_schema is None:
                p.include_in_schema = self._solve_include_in_schema(
                    p.include_in_schema
                )
            else:
                p.include_in_schema = include_in_schema

            p._broker_middlewares = (
                *self._middlewares,
                *middlewares,
                *p._broker_middlewares,
            )
            self._publishers = {**self._publishers, key: p}

include_routers #

include_routers(*routers)

Includes routers in the object.

Source code in faststream/broker/core/abc.py
def include_routers(
    self,
    *routers: "ABCBroker[MsgType]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)