Skip to content

RabbitBroker

faststream.rabbit.broker.RabbitBroker #

RabbitBroker(
    url="amqp://guest:guest@localhost:5672/",
    *,
    host=None,
    port=None,
    virtualhost=None,
    ssl_options=None,
    client_properties=None,
    timeout=None,
    fail_fast=True,
    reconnect_interval=5.0,
    default_channel=None,
    channel_number=None,
    publisher_confirms=True,
    on_return_raises=False,
    max_consumers=None,
    app_id=SERVICE_NAME,
    graceful_timeout=None,
    decoder=None,
    parser=None,
    dependencies=(),
    middlewares=(),
    security=None,
    asyncapi_url=None,
    protocol=None,
    protocol_version="0.9.1",
    description=None,
    tags=None,
    logger=EMPTY,
    log_level=INFO,
    log_fmt=None,
    apply_types=True,
    validate=True,
    _get_dependant=None,
    _call_decorators=(),
)

Bases: RabbitRegistrator, RabbitLoggingBroker

A class to represent a RabbitMQ broker.

Initialize the RabbitBroker.

PARAMETER DESCRIPTION
url

RabbitMQ destination location to connect.

TYPE: Union[str, URL, None] DEFAULT: 'amqp://guest:guest@localhost:5672/'

host

Destination host. This option overrides url option host.

TYPE: Optional[str] DEFAULT: None

port

Destination port. This option overrides url option port.

TYPE: Optional[int] DEFAULT: None

virtualhost

RabbitMQ virtual host to use in the current broker connection.

TYPE: Optional[str] DEFAULT: None

ssl_options

Extra ssl options to establish connection.

TYPE: Optional[SSLOptions] DEFAULT: None

client_properties

Add custom client capability.

TYPE: Optional[RabbitClientProperties] DEFAULT: None

timeout

Connection establishment timeout.

TYPE: TimeoutType DEFAULT: None

fail_fast

Broker startup raises AMQPConnectionError if RabbitMQ is unreachable.

TYPE: bool DEFAULT: True

reconnect_interval

Time to sleep between reconnection attempts.

TYPE: TimeoutType DEFAULT: 5.0

default_channel

Default channel settings to use.

TYPE: Optional[Channel] DEFAULT: None

channel_number

Specify the channel number explicitly. Deprecated in FastStream 0.5.39.

TYPE: Optional[int] DEFAULT: None

publisher_confirms

If True, the publish method will return bool type after publish is complete. Otherwise, it will return None. Deprecated in FastStream 0.5.39.

TYPE: bool DEFAULT: True

on_return_raises

Raise an :class:aio_pika.exceptions.DeliveryError when mandatory message will be returned. Deprecated in FastStream 0.5.39.

TYPE: bool DEFAULT: False

max_consumers

RabbitMQ channel qos / prefetch_count option. It limits max messages processing in the same time count. Deprecated in FastStream 0.5.39.

TYPE: Optional[int] DEFAULT: None

app_id

Application name to mark outgoing messages by.

TYPE: Optional[str] DEFAULT: SERVICE_NAME

graceful_timeout

Graceful shutdown timeout. Broker waits for all running subscribers completion before shut down.

TYPE: Optional[float] DEFAULT: None

decoder

Custom decoder object.

TYPE: Optional[CustomCallable] DEFAULT: None

parser

Custom parser object.

TYPE: Optional[CustomCallable] DEFAULT: None

dependencies

Dependencies to apply to all broker subscribers.

TYPE: Iterable[Depends] DEFAULT: ()

middlewares

Middlewares to apply to all broker publishers/subscribers.

TYPE: Sequence[BrokerMiddleware[IncomingMessage]] DEFAULT: ()

security

Security options to connect broker and generate AsyncAPI server security information.

TYPE: Optional[BaseSecurity] DEFAULT: None

asyncapi_url

AsyncAPI hardcoded server addresses. Use servers if not specified.

TYPE: Optional[str] DEFAULT: None

protocol

AsyncAPI server protocol.

TYPE: Optional[str] DEFAULT: None

protocol_version

AsyncAPI server protocol version.

TYPE: Optional[str] DEFAULT: '0.9.1'

description

AsyncAPI server description.

TYPE: Optional[str] DEFAULT: None

tags

AsyncAPI server tags.

TYPE: Optional[Iterable[Union[Tag, TagDict]]] DEFAULT: None

logger

User-specified logger to pass into Context and log service messages.

TYPE: Optional[LoggerProto] DEFAULT: EMPTY

log_level

Service messages log level.

TYPE: int DEFAULT: INFO

log_fmt

Default logger log format.

TYPE: Optional[str] DEFAULT: None

apply_types

Whether to use FastDepends or not.

TYPE: bool DEFAULT: True

validate

Whether to cast types using Pydantic validation.

TYPE: bool DEFAULT: True

_get_dependant

Custom library dependant generator callback.

TYPE: Optional[Callable[..., Any]] DEFAULT: None

_call_decorators

Any custom decorator to apply to wrapped functions.

TYPE: Iterable[Decorator] DEFAULT: ()

Source code in faststream/rabbit/broker/broker.py
def __init__(
    self,
    url: Union[
        str, "URL", None
    ] = "amqp://guest:guest@localhost:5672/",  # pragma: allowlist secret
    *,
    host: Optional[str] = None,
    port: Optional[int] = None,
    virtualhost: Optional[str] = None,
    ssl_options: Optional["SSLOptions"] = None,
    client_properties: Optional["RabbitClientProperties"] = None,
    timeout: "TimeoutType" = None,
    fail_fast: bool = True,
    reconnect_interval: "TimeoutType" = 5.0,
    default_channel: Optional[Channel] = None,
    channel_number: Annotated[
        Optional[int],
        deprecated(
            "Deprecated in **FastStream 0.5.39**. "
            "Please, use `default_channel=Channel(channel_number=...)` instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = None,
    publisher_confirms: Annotated[
        bool,
        deprecated(
            "Deprecated in **FastStream 0.5.39**. "
            "Please, use `default_channel=Channel(publisher_confirms=...)` instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = True,
    on_return_raises: Annotated[
        bool,
        deprecated(
            "Deprecated in **FastStream 0.5.39**. "
            "Please, use `default_channel=Channel(on_return_raises=...)` instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = False,
    max_consumers: Annotated[
        Optional[int],
        deprecated(
            "Deprecated in **FastStream 0.5.39**. "
            "Please, use `default_channel=Channel(prefetch_count=...)` instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = None,
    app_id: Optional[str] = SERVICE_NAME,
    graceful_timeout: Optional[float] = None,
    decoder: Optional["CustomCallable"] = None,
    parser: Optional["CustomCallable"] = None,
    dependencies: Iterable["Depends"] = (),
    middlewares: Sequence["BrokerMiddleware[IncomingMessage]"] = (),
    security: Optional["BaseSecurity"] = None,
    asyncapi_url: Optional[str] = None,
    protocol: Optional[str] = None,
    protocol_version: Optional[str] = "0.9.1",
    description: Optional[str] = None,
    tags: Optional[Iterable[Union["asyncapi.Tag", "asyncapi.TagDict"]]] = None,
    logger: Optional["LoggerProto"] = EMPTY,
    log_level: int = logging.INFO,
    log_fmt: Optional[str] = None,
    apply_types: bool = True,
    validate: bool = True,
    _get_dependant: Optional[Callable[..., Any]] = None,
    _call_decorators: Iterable["Decorator"] = (),
) -> None:
    """Initialize the RabbitBroker.

    Args:
        url: RabbitMQ destination location to connect.
        host: Destination host. This option overrides `url` option host.
        port: Destination port. This option overrides `url` option port.
        virtualhost: RabbitMQ virtual host to use in the current broker connection.
        ssl_options: Extra ssl options to establish connection.
        client_properties: Add custom client capability.
        timeout: Connection establishment timeout.
        fail_fast: Broker startup raises `AMQPConnectionError` if RabbitMQ is unreachable.
        reconnect_interval: Time to sleep between reconnection attempts.
        default_channel: Default channel settings to use.
        channel_number: Specify the channel number explicitly. Deprecated in **FastStream 0.5.39**.
        publisher_confirms: If `True`, the `publish` method will return `bool` type after publish is complete.
            Otherwise, it will return `None`. Deprecated in **FastStream 0.5.39**.
        on_return_raises: Raise an :class:`aio_pika.exceptions.DeliveryError` when mandatory message will be returned.
            Deprecated in **FastStream 0.5.39**.
        max_consumers: RabbitMQ channel `qos` / `prefetch_count` option. It limits max messages processing
            in the same time count. Deprecated in **FastStream 0.5.39**.
        app_id: Application name to mark outgoing messages by.
        graceful_timeout: Graceful shutdown timeout. Broker waits for all running subscribers completion before shut down.
        decoder: Custom decoder object.
        parser: Custom parser object.
        dependencies: Dependencies to apply to all broker subscribers.
        middlewares: Middlewares to apply to all broker publishers/subscribers.
        security: Security options to connect broker and generate AsyncAPI server security information.
        asyncapi_url: AsyncAPI hardcoded server addresses. Use `servers` if not specified.
        protocol: AsyncAPI server protocol.
        protocol_version: AsyncAPI server protocol version.
        description: AsyncAPI server description.
        tags: AsyncAPI server tags.
        logger: User-specified logger to pass into Context and log service messages.
        log_level: Service messages log level.
        log_fmt: Default logger log format.
        apply_types: Whether to use FastDepends or not.
        validate: Whether to cast types using Pydantic validation.
        _get_dependant: Custom library dependant generator callback.
        _call_decorators: Any custom decorator to apply to wrapped functions.
    """
    security_args = parse_security(security)

    amqp_url = build_url(
        url,
        host=host,
        port=port,
        virtualhost=virtualhost,
        ssl_options=ssl_options,
        client_properties=client_properties,
        login=security_args.get("login"),
        password=security_args.get("password"),
        ssl=security_args.get("ssl"),
    )

    if asyncapi_url is None:
        asyncapi_url = str(amqp_url)

    # respect ascynapi_url argument scheme
    built_asyncapi_url = urlparse(asyncapi_url)
    self.virtual_host = built_asyncapi_url.path
    if protocol is None:
        protocol = built_asyncapi_url.scheme

    channel_settings = default_channel or Channel(
        channel_number=channel_number,
        publisher_confirms=publisher_confirms,
        on_return_raises=on_return_raises,
        prefetch_count=max_consumers,
    )

    super().__init__(
        url=str(amqp_url),
        ssl_context=security_args.get("ssl_context"),
        timeout=timeout,
        fail_fast=fail_fast,
        reconnect_interval=reconnect_interval,
        # channel args
        channel_settings=channel_settings,
        # Basic args
        graceful_timeout=graceful_timeout,
        dependencies=dependencies,
        decoder=decoder,
        parser=parser,
        middlewares=middlewares,
        # AsyncAPI args
        description=description,
        asyncapi_url=asyncapi_url,
        protocol=protocol or built_asyncapi_url.scheme,
        protocol_version=protocol_version,
        security=security,
        tags=tags,
        # Logging args
        logger=logger,
        log_level=log_level,
        log_fmt=log_fmt,
        # FastDepends args
        apply_types=apply_types,
        validate=validate,
        _get_dependant=_get_dependant,
        _call_decorators=_call_decorators,
    )

    self.app_id = app_id

    self._channel = None
    self.declarer = None

prefix instance-attribute #

prefix = prefix

include_in_schema instance-attribute #

include_in_schema = include_in_schema

logger instance-attribute #

logger

use_custom instance-attribute #

use_custom = True

running instance-attribute #

running = False

graceful_timeout instance-attribute #

graceful_timeout = graceful_timeout

protocol instance-attribute #

protocol = protocol

protocol_version instance-attribute #

protocol_version = protocol_version

description instance-attribute #

description = description

tags instance-attribute #

tags = tags

security instance-attribute #

security = security

url instance-attribute #

url

declarer instance-attribute #

declarer = None

virtual_host instance-attribute #

virtual_host = path

app_id instance-attribute #

app_id = app_id

setup #

setup()

Prepare all Broker entities to startup.

Source code in faststream/broker/core/usecase.py
def setup(self) -> None:
    """Prepare all Broker entities to startup."""
    for h in self._subscribers.values():
        self.setup_subscriber(h)

    for p in self._publishers.values():
        self.setup_publisher(p)

add_middleware #

add_middleware(middleware)

Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of already existed ones.

Source code in faststream/broker/core/abc.py
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.

    Current middleware will be used as a most inner of already existed ones.
    """
    self._middlewares = (*self._middlewares, middleware)

    for sub in self._subscribers.values():
        sub.add_middleware(middleware)

    for pub in self._publishers.values():
        pub.add_middleware(middleware)

subscriber #

subscriber(
    queue,
    exchange=None,
    *,
    consume_args=None,
    dependencies=(),
    parser=None,
    decoder=None,
    middlewares=(),
    channel=None,
    reply_config=None,
    filter=default_filter,
    retry=False,
    no_ack=False,
    no_reply=False,
    title=None,
    description=None,
    include_in_schema=True,
)

Declares RabbitMQ subscriber object and binds it to the exchange.

You can use it as a handler decorator - @broker.subscriber(...). Or you can create a subscriber object to call it lately - broker.subscriber(...).

PARAMETER DESCRIPTION
queue

RabbitMQ queue to listen. FastStream declares and binds

TYPE: Union[str, RabbitQueue]

exchange

RabbitMQ exchange to bind queue to. Uses default exchange

TYPE: Union[str, RabbitExchange, None] DEFAULT: None

consume_args

Extra consumer arguments to use in queue.consume(...) method.

TYPE: Optional[AnyDict] DEFAULT: None

channel

Channel to use for consuming messages. If not specified, a default channel will be used.

TYPE: Optional[Channel] DEFAULT: None

reply_config

Extra options to use at replies publishing.

TYPE: Optional[ReplyConfig] DEFAULT: None

dependencies

Dependencies list ([Depends(),]) to apply to the subscriber.

TYPE: Iterable[Depends] DEFAULT: ()

parser

Parser to map original IncomingMessage Msg to FastStream one.

TYPE: Optional[CustomCallable] DEFAULT: None

decoder

Function to decode FastStream msg bytes body to python objects.

TYPE: Optional[CustomCallable] DEFAULT: None

middlewares

Subscriber middlewares to wrap incoming message processing.

TYPE: Sequence[SubscriberMiddleware[RabbitMessage]] DEFAULT: ()

filter

Overload subscriber to consume various messages from the same source.

TYPE: Filter[RabbitMessage] DEFAULT: default_filter

retry

Whether to nack message at processing exception.

TYPE: Union[bool, int] DEFAULT: False

no_ack

Whether to disable FastStream autoacknowledgement logic or not.

TYPE: bool DEFAULT: False

no_reply

Whether to disable FastStream RPC and Reply To auto responses or not.

TYPE: bool DEFAULT: False

title

AsyncAPI subscriber object title.

TYPE: Optional[str] DEFAULT: None

description

AsyncAPI subscriber object description. Uses decorated docstring as default.

TYPE: Optional[str] DEFAULT: None

include_in_schema

Whether to include operation in AsyncAPI schema or not.

TYPE: bool DEFAULT: True

Source code in faststream/rabbit/broker/registrator.py
@override
def subscriber(  # type: ignore[override]
    self,
    queue: Union[str, "RabbitQueue"],
    exchange: Union[str, "RabbitExchange", None] = None,
    *,
    consume_args: Optional["AnyDict"] = None,
    dependencies: Iterable["Depends"] = (),
    parser: Optional["CustomCallable"] = None,
    decoder: Optional["CustomCallable"] = None,
    middlewares: Sequence["SubscriberMiddleware[RabbitMessage]"] = (),
    channel: Optional["Channel"] = None,
    reply_config: Annotated[
        Optional["ReplyConfig"],
        deprecated(
            "Deprecated in **FastStream 0.5.16**. "
            "Please, use `RabbitResponse` object as a handler return instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = None,
    filter: Annotated[
        "Filter[RabbitMessage]",
        deprecated(
            "Deprecated in **FastStream 0.5.0**. Please, create `subscriber` object "
            "and use it explicitly instead. Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = default_filter,
    retry: Union[bool, int] = False,
    no_ack: bool = False,
    no_reply: bool = False,
    title: Optional[str] = None,
    description: Optional[str] = None,
    include_in_schema: bool = True,
) -> AsyncAPISubscriber:
    """Declares RabbitMQ subscriber object and binds it to the exchange.

    You can use it as a handler decorator - `@broker.subscriber(...)`.
    Or you can create a subscriber object to call it lately - `broker.subscriber(...)`.

    Args:
        queue: RabbitMQ queue to listen. **FastStream** declares and binds
        queue object to `exchange` automatically if it is not passive (by default).
        exchange: RabbitMQ exchange to bind queue to. Uses default exchange
        if not presented. **FastStream** declares exchange object automatically
        if it is not passive (by default).
        consume_args: Extra consumer arguments to use in `queue.consume(...)` method.
        channel: Channel to use for consuming messages. If not specified, a default channel will be used.
        reply_config: Extra options to use at replies publishing.
        dependencies: Dependencies list (`[Depends(),]`) to apply to the subscriber.
        parser: Parser to map original **IncomingMessage** Msg to FastStream one.
        decoder: Function to decode FastStream msg bytes body to python objects.
        middlewares: Subscriber middlewares to wrap incoming message processing.
        filter: Overload subscriber to consume various messages from the same source.
        retry: Whether to `nack` message at processing exception.
        no_ack: Whether to disable **FastStream** autoacknowledgement logic or not.
        no_reply: Whether to disable **FastStream** RPC and Reply To auto responses or not.
        title: AsyncAPI subscriber object title.
        description: AsyncAPI subscriber object description. Uses decorated docstring as default.
        include_in_schema: Whether to include operation in AsyncAPI schema or not.
    """
    subscriber = cast(
        "AsyncAPISubscriber",
        super().subscriber(
            create_subscriber(
                queue=RabbitQueue.validate(queue),
                exchange=RabbitExchange.validate(exchange),
                consume_args=consume_args,
                reply_config=reply_config,
                channel=channel,
                # subscriber args
                no_ack=no_ack,
                no_reply=no_reply,
                retry=retry,
                broker_middlewares=self._middlewares,
                broker_dependencies=self._dependencies,
                # AsyncAPI
                title_=title,
                description_=description,
                include_in_schema=self._solve_include_in_schema(include_in_schema),
            )
        ),
    )

    return subscriber.add_call(
        filter_=filter,
        parser_=parser or self._parser,
        decoder_=decoder or self._decoder,
        dependencies_=dependencies,
        middlewares_=middlewares,
    )

publisher #

publisher(
    queue="",
    exchange=None,
    *,
    routing_key="",
    mandatory=True,
    immediate=False,
    timeout=None,
    persist=False,
    reply_to=None,
    priority=None,
    middlewares=(),
    title=None,
    description=None,
    schema=None,
    include_in_schema=True,
    headers=None,
    content_type=None,
    content_encoding=None,
    expiration=None,
    message_type=None,
    user_id=None,
)

Creates long-living and AsyncAPI-documented publisher object.

You can use it as a handler decorator (handler should be decorated by @broker.subscriber(...) too) - @broker.publisher(...). In such case publisher will publish your handler return value.

Or you can create a publisher object to call it lately - broker.publisher(...).publish(...).

PARAMETER DESCRIPTION
queue

Default message routing key to publish with.

TYPE: Union[RabbitQueue, str] DEFAULT: ''

exchange

Target exchange to publish message to.

TYPE: Union[RabbitExchange, str, None] DEFAULT: None

routing_key

Default message routing key to publish with.

TYPE: str DEFAULT: ''

mandatory

Client waits for confirmation that the message is placed to some queue. RabbitMQ returns message to client if there is no suitable queue.

TYPE: bool DEFAULT: True

immediate

Client expects that there is a consumer ready to take the message to work. RabbitMQ returns message to client if there is no suitable consumer.

TYPE: bool DEFAULT: False

timeout

Send confirmation time from RabbitMQ.

TYPE: TimeoutType DEFAULT: None

persist

Restore the message on RabbitMQ reboot.

TYPE: bool DEFAULT: False

reply_to

Reply message routing key to send with (always sending to default exchange).

TYPE: Optional[str] DEFAULT: None

priority

The message priority (0 by default).

TYPE: Optional[int] DEFAULT: None

middlewares

Publisher middlewares to wrap outgoing messages.

TYPE: Sequence[PublisherMiddleware] DEFAULT: ()

title

AsyncAPI publisher object title.

TYPE: Optional[str] DEFAULT: None

description

AsyncAPI publisher object description.

TYPE: Optional[str] DEFAULT: None

schema

AsyncAPI publishing message type. Should be any python-native object annotation or pydantic.BaseModel.

TYPE: Optional[Any] DEFAULT: None

include_in_schema

Whether to include operation in AsyncAPI schema or not.

TYPE: bool DEFAULT: True

headers

Message headers to store meta-information. Can be overridden by publish.headers if specified.

TYPE: Optional[HeadersType] DEFAULT: None

content_type

Message content-type header. Used by application, not core RabbitMQ. Will be set automatically if not specified.

TYPE: Optional[str] DEFAULT: None

content_encoding

Message body content encoding, e.g. gzip.

TYPE: Optional[str] DEFAULT: None

expiration

Message expiration (lifetime) in seconds (or datetime or timedelta).

TYPE: Optional[DateType] DEFAULT: None

message_type

Application-specific message type, e.g. orders.created.

TYPE: Optional[str] DEFAULT: None

user_id

Publisher connection User ID, validated if set.

TYPE: Optional[str] DEFAULT: None

Source code in faststream/rabbit/broker/registrator.py
@override
def publisher(  # type: ignore[override]
    self,
    queue: Union["RabbitQueue", str] = "",
    exchange: Union["RabbitExchange", str, None] = None,
    *,
    routing_key: str = "",
    mandatory: bool = True,
    immediate: bool = False,
    timeout: "TimeoutType" = None,
    persist: bool = False,
    reply_to: Optional[str] = None,
    priority: Optional[int] = None,
    middlewares: Sequence["PublisherMiddleware"] = (),
    title: Optional[str] = None,
    description: Optional[str] = None,
    schema: Optional[Any] = None,
    include_in_schema: bool = True,
    headers: Optional["HeadersType"] = None,
    content_type: Optional[str] = None,
    content_encoding: Optional[str] = None,
    expiration: Optional["DateType"] = None,
    message_type: Optional[str] = None,
    user_id: Optional[str] = None,
) -> AsyncAPIPublisher:
    """Creates long-living and AsyncAPI-documented publisher object.

    You can use it as a handler decorator (handler should be decorated by `@broker.subscriber(...)` too) - `@broker.publisher(...)`.
    In such case publisher will publish your handler return value.

    Or you can create a publisher object to call it lately - `broker.publisher(...).publish(...)`.

    Args:
        queue: Default message routing key to publish with.
        exchange: Target exchange to publish message to.
        routing_key: Default message routing key to publish with.
        Overrides `queue` option if presented.
        mandatory: Client waits for confirmation that the message is placed
            to some queue. RabbitMQ returns message to client if there is no suitable queue.
        immediate: Client expects that there is a consumer ready to take the message to work.
            RabbitMQ returns message to client if there is no suitable consumer.
        timeout: Send confirmation time from RabbitMQ.
        persist: Restore the message on RabbitMQ reboot.
        reply_to: Reply message routing key to send with (always sending to default exchange).
        priority: The message priority (0 by default).
        middlewares: Publisher middlewares to wrap outgoing messages.
        title: AsyncAPI publisher object title.
        description: AsyncAPI publisher object description.
        schema: AsyncAPI publishing message type. Should be any python-native
            object annotation or `pydantic.BaseModel`.
        include_in_schema: Whether to include operation in AsyncAPI schema or not.
        headers: Message headers to store meta-information. Can be overridden
            by `publish.headers` if specified.
        content_type: Message **content-type** header. Used by application, not core RabbitMQ.
            Will be set automatically if not specified.
        content_encoding: Message body content encoding, e.g. **gzip**.
        expiration: Message expiration (lifetime) in seconds (or datetime or timedelta).
        message_type: Application-specific message type, e.g. **orders.created**.
        user_id: Publisher connection User ID, validated if set.
    """
    message_kwargs = PublishKwargs(
        mandatory=mandatory,
        immediate=immediate,
        timeout=timeout,
        persist=persist,
        reply_to=reply_to,
        headers=headers,
        priority=priority,
        content_type=content_type,
        content_encoding=content_encoding,
        message_type=message_type,
        user_id=user_id,
        expiration=expiration,
    )

    publisher = cast(
        "AsyncAPIPublisher",
        super().publisher(
            AsyncAPIPublisher.create(
                routing_key=routing_key,
                queue=RabbitQueue.validate(queue),
                exchange=RabbitExchange.validate(exchange),
                message_kwargs=message_kwargs,
                # Specific
                broker_middlewares=self._middlewares,
                middlewares=middlewares,
                # AsyncAPI
                title_=title,
                description_=description,
                schema_=schema,
                include_in_schema=self._solve_include_in_schema(include_in_schema),
            )
        ),
    )

    return publisher

include_router #

include_router(
    router,
    *,
    prefix="",
    dependencies=(),
    middlewares=(),
    include_in_schema=None,
)
Source code in faststream/rabbit/broker/registrator.py
@override
def include_router(
    self,
    router: "RabbitRegistrator",  # type: ignore[override]
    *,
    prefix: str = "",
    dependencies: Iterable["Depends"] = (),
    middlewares: Iterable["BrokerMiddleware[IncomingMessage]"] = (),
    include_in_schema: Optional[bool] = None,
) -> None:
    if not isinstance(router, RabbitRegistrator):
        msg = (
            f"Router must be an instance of RabbitRegistrator, "
            f"got {type(router).__name__} instead"
        )
        raise SetupError(msg)

    super().include_router(
        router,
        prefix=prefix,
        dependencies=dependencies,
        middlewares=middlewares,
        include_in_schema=include_in_schema,
    )

include_routers #

include_routers(*routers)

Includes routers in the object.

Source code in faststream/broker/core/abc.py
def include_routers(
    self,
    *routers: "ABCBroker[MsgType]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)

get_fmt #

get_fmt()
Source code in faststream/rabbit/broker/logging.py
def get_fmt(self) -> str:
    return (
        "%(asctime)s %(levelname)-8s - "
        f"%(exchange)-{self._max_exchange_len}s | "
        f"%(queue)-{self._max_queue_len}s | "
        f"%(message_id)-{self.__max_msg_id_ln}s "
        "- %(message)s"
    )

setup_subscriber #

setup_subscriber(subscriber, **kwargs)

Setup the Subscriber to prepare it to starting.

Source code in faststream/broker/core/usecase.py
def setup_subscriber(
    self,
    subscriber: SubscriberProto[MsgType],
    **kwargs: Any,
) -> None:
    """Setup the Subscriber to prepare it to starting."""
    data = self._subscriber_setup_extra.copy()
    data.update(kwargs)
    subscriber.setup(**data)

setup_publisher #

setup_publisher(publisher, **kwargs)

Setup the Publisher to prepare it to starting.

Source code in faststream/broker/core/usecase.py
def setup_publisher(
    self,
    publisher: "PublisherProto[MsgType]",
    **kwargs: Any,
) -> None:
    """Setup the Publisher to prepare it to starting."""
    data = self._publisher_setup_extra.copy()
    data.update(kwargs)
    publisher.setup(**data)

close async #

close(exc_type=None, exc_val=None, exc_tb=None)

Closes the object.

Source code in faststream/broker/core/usecase.py
async def close(
    self,
    exc_type: Optional[Type[BaseException]] = None,
    exc_val: Optional[BaseException] = None,
    exc_tb: Optional["TracebackType"] = None,
) -> None:
    """Closes the object."""
    self.running = False

    for h in self._subscribers.values():
        await h.close()

    if self._connection is not None:
        await self._close(exc_type, exc_val, exc_tb)

connect async #

connect(
    url=EMPTY,
    *,
    host=None,
    port=None,
    virtualhost=None,
    ssl_options=None,
    client_properties=None,
    security=None,
    timeout=None,
    fail_fast=EMPTY,
    reconnect_interval=EMPTY,
    default_channel=None,
    channel_number=EMPTY,
    publisher_confirms=EMPTY,
    on_return_raises=EMPTY,
)

Connect broker object to RabbitMQ.

To startup subscribers too you should use broker.start() after/instead this method.

PARAMETER DESCRIPTION
url

RabbitMQ destination location to connect.

TYPE: Union[str, URL, None] DEFAULT: EMPTY

host

Destination host. This option overrides url option host.

TYPE: Optional[str] DEFAULT: None

port

Destination port. This option overrides url option port.

TYPE: Optional[int] DEFAULT: None

virtualhost

RabbitMQ virtual host to use in the current broker connection.

TYPE: Optional[str] DEFAULT: None

ssl_options

Extra ssl options to establish connection.

TYPE: Optional[SSLOptions] DEFAULT: None

client_properties

Add custom client capability.

TYPE: Optional[RabbitClientProperties] DEFAULT: None

security

Security options to connect broker and generate AsyncAPI server security information.

TYPE: Optional[BaseSecurity] DEFAULT: None

timeout

Connection establishement timeout.

TYPE: TimeoutType DEFAULT: None

fail_fast

Broker startup raises AMQPConnectionError if RabbitMQ is unreachable.

TYPE: bool DEFAULT: EMPTY

reconnect_interval

Time to sleep between reconnection attempts.

TYPE: TimeoutType DEFAULT: EMPTY

default_channel

Default channel settings to use.

TYPE: Optional[Channel] DEFAULT: None

channel_number

Specify the channel number explicit.

TYPE: Optional[int] DEFAULT: EMPTY

publisher_confirms

if True the publish method will return bool type after publish is complete. Otherwise it will returns None.

TYPE: bool DEFAULT: EMPTY

on_return_raises

raise an :class:aio_pika.exceptions.DeliveryError when mandatory message will be returned

TYPE: bool DEFAULT: EMPTY

Source code in faststream/rabbit/broker/broker.py
@override
async def connect(  # type: ignore[override]
    self,
    url: Union[str, "URL", None] = EMPTY,
    *,
    host: Optional[str] = None,
    port: Optional[int] = None,
    virtualhost: Optional[str] = None,
    ssl_options: Optional["SSLOptions"] = None,
    client_properties: Optional["RabbitClientProperties"] = None,
    security: Optional["BaseSecurity"] = None,
    timeout: "TimeoutType" = None,
    fail_fast: bool = EMPTY,
    reconnect_interval: "TimeoutType" = EMPTY,
    # channel args
    default_channel: Optional[Channel] = None,
    channel_number: Annotated[
        Optional[int],
        deprecated(
            "Deprecated in **FastStream 0.5.39**. "
            "Please, use `default_channel=Channel(channel_number=...)` instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = EMPTY,
    publisher_confirms: Annotated[
        bool,
        deprecated(
            "Deprecated in **FastStream 0.5.39**. "
            "Please, use `default_channel=Channel(publisher_confirms=...)` instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = EMPTY,
    on_return_raises: Annotated[
        bool,
        deprecated(
            "Deprecated in **FastStream 0.5.39**. "
            "Please, use `default_channel=Channel(on_return_raises=...)` instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = EMPTY,
) -> "RobustConnection":
    """Connect broker object to RabbitMQ.

    To startup subscribers too you should use `broker.start()` after/instead this method.

    Args:
        url: RabbitMQ destination location to connect.
        host: Destination host. This option overrides `url` option host.
        port: Destination port. This option overrides `url` option port.
        virtualhost: RabbitMQ virtual host to use in the current broker connection.
        ssl_options: Extra ssl options to establish connection.
        client_properties: Add custom client capability.
        security: Security options to connect broker and generate AsyncAPI server security information.
        timeout: Connection establishement timeout.
        fail_fast: Broker startup raises `AMQPConnectionError` if RabbitMQ is unreachable.
        reconnect_interval: Time to sleep between reconnection attempts.
        default_channel: Default channel settings to use.
        channel_number: Specify the channel number explicit.
        publisher_confirms: if `True` the `publish` method will
            return `bool` type after publish is complete.
            Otherwise it will returns `None`.
        on_return_raises: raise an :class:`aio_pika.exceptions.DeliveryError`
            when mandatory message will be returned
    """
    kwargs: AnyDict = {}

    if not default_channel and (
        channel_number is not EMPTY
        or publisher_confirms is not EMPTY
        or on_return_raises is not EMPTY
    ):
        default_channel = Channel(
            channel_number=None if channel_number is EMPTY else channel_number,
            publisher_confirms=True
            if publisher_confirms is EMPTY
            else publisher_confirms,
            on_return_raises=False
            if on_return_raises is EMPTY
            else on_return_raises,
        )

    if default_channel:
        kwargs["channel_settings"] = default_channel

    if timeout:
        kwargs["timeout"] = timeout

    if fail_fast is not EMPTY:
        kwargs["fail_fast"] = fail_fast

    if reconnect_interval is not EMPTY:
        kwargs["reconnect_interval"] = reconnect_interval

    url = None if url is EMPTY else url

    if url or any(
        (host, port, virtualhost, ssl_options, client_properties, security)
    ):
        security_args = parse_security(security)

        kwargs["url"] = build_url(
            url,
            host=host,
            port=port,
            virtualhost=virtualhost,
            ssl_options=ssl_options,
            client_properties=client_properties,
            login=security_args.get("login"),
            password=security_args.get("password"),
            ssl=security_args.get("ssl"),
        )

        if ssl_context := security_args.get("ssl_context"):
            kwargs["ssl_context"] = ssl_context

    connection = await super().connect(**kwargs)

    return connection

start async #

start()

Connect broker to RabbitMQ and startup all subscribers.

Source code in faststream/rabbit/broker/broker.py
async def start(self) -> None:
    """Connect broker to RabbitMQ and startup all subscribers."""
    await super().start()

    assert self.declarer, NOT_CONNECTED_YET  # nosec B101

    for publisher in self._publishers.values():
        if publisher.exchange is not None:
            await self.declare_exchange(publisher.exchange)

    for subscriber in self._subscribers.values():
        self._log(
            f"`{subscriber.call_name}` waiting for messages",
            extra=subscriber.get_log_context(None),
        )
        await subscriber.start()

publish async #

publish(
    message=None,
    queue="",
    exchange=None,
    *,
    routing_key="",
    mandatory=True,
    immediate=False,
    timeout=None,
    persist=False,
    reply_to=None,
    rpc=False,
    rpc_timeout=30.0,
    raise_timeout=False,
    correlation_id=None,
    headers=None,
    content_type=None,
    content_encoding=None,
    expiration=None,
    message_id=None,
    timestamp=None,
    message_type=None,
    user_id=None,
    priority=None,
)

Publish message directly.

This method allows you to publish message in not AsyncAPI-documented way. You can use it in another frameworks applications or to publish messages from time to time.

Please, use @broker.publisher(...) or broker.publisher(...).publish(...) instead in a regular way.

PARAMETER DESCRIPTION
message

Message body to send.

TYPE: AioPikaSendableMessage DEFAULT: None

queue

Message routing key to publish with.

TYPE: Union[RabbitQueue, str] DEFAULT: ''

exchange

Target exchange to publish message to.

TYPE: Union[RabbitExchange, str, None] DEFAULT: None

routing_key

Message routing key to publish with. Overrides queue option if presented.

TYPE: str DEFAULT: ''

mandatory

Client waits for confirmation that the message is placed to some queue.

TYPE: bool DEFAULT: True

immediate

Client expects that there is consumer ready to take the message to work.

TYPE: bool DEFAULT: False

timeout

Send confirmation time from RabbitMQ.

TYPE: TimeoutType DEFAULT: None

persist

Restore the message on RabbitMQ reboot.

TYPE: bool DEFAULT: False

reply_to

Reply message routing key to send with (always sending to default exchange).

TYPE: Optional[str] DEFAULT: None

rpc

Whether to wait for reply in blocking mode.

TYPE: bool DEFAULT: False

rpc_timeout

RPC reply waiting time.

TYPE: Optional[float] DEFAULT: 30.0

raise_timeout

Whether to raise TimeoutError or return None at rpc_timeout.

TYPE: bool DEFAULT: False

correlation_id

Manual message correlation_id setter.

TYPE: Optional[str] DEFAULT: None

headers

Message headers to store metainformation.

TYPE: Optional[HeadersType] DEFAULT: None

content_type

Message content-type header.

TYPE: Optional[str] DEFAULT: None

content_encoding

Message body content encoding, e.g. gzip.

TYPE: Optional[str] DEFAULT: None

expiration

Message expiration (lifetime) in seconds (or datetime or timedelta).

TYPE: Optional[DateType] DEFAULT: None

message_id

Arbitrary message id. Generated automatically if not presented.

TYPE: Optional[str] DEFAULT: None

timestamp

Message publish timestamp. Generated automatically if not presented.

TYPE: Optional[DateType] DEFAULT: None

message_type

Application-specific message type, e.g. orders.created.

TYPE: Optional[str] DEFAULT: None

user_id

Publisher connection User ID, validated if set.

TYPE: Optional[str] DEFAULT: None

priority

The message priority (0 by default).

TYPE: Optional[int] DEFAULT: None

Source code in faststream/rabbit/broker/broker.py
@override
async def publish(  # type: ignore[override]
    self,
    message: "AioPikaSendableMessage" = None,
    queue: Union["RabbitQueue", str] = "",
    exchange: Union["RabbitExchange", str, None] = None,
    *,
    routing_key: str = "",
    mandatory: bool = True,
    immediate: bool = False,
    timeout: "TimeoutType" = None,
    persist: bool = False,
    reply_to: Optional[str] = None,
    rpc: Annotated[
        bool,
        deprecated(
            "Deprecated in **FastStream 0.5.17**. Please, use `request` method instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = False,
    rpc_timeout: Annotated[
        Optional[float],
        deprecated(
            "Deprecated in **FastStream 0.5.17**. Please, use `request` method with `timeout` instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = 30.0,
    raise_timeout: Annotated[
        bool,
        deprecated(
            "Deprecated in **FastStream 0.5.17**. `request` always raises TimeoutError instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = False,
    # message args
    correlation_id: Optional[str] = None,
    headers: Optional["HeadersType"] = None,
    content_type: Optional[str] = None,
    content_encoding: Optional[str] = None,
    expiration: Optional["DateType"] = None,
    message_id: Optional[str] = None,
    timestamp: Optional["DateType"] = None,
    message_type: Optional[str] = None,
    user_id: Optional[str] = None,
    priority: Optional[int] = None,
) -> Optional[Any]:
    """Publish message directly.

    This method allows you to publish message in not AsyncAPI-documented way. You can use it in another frameworks
    applications or to publish messages from time to time.

    Please, use `@broker.publisher(...)` or `broker.publisher(...).publish(...)` instead in a regular way.

    Args:
        message: Message body to send.
        queue: Message routing key to publish with.
        exchange: Target exchange to publish message to.
        routing_key: Message routing key to publish with. Overrides `queue` option if presented.
        mandatory: Client waits for confirmation that the message is placed to some queue.
        RabbitMQ returns message to client if there is no suitable queue.
        immediate: Client expects that there is consumer ready to take the message to work.
        RabbitMQ returns message to client if there is no suitable consumer.
        timeout: Send confirmation time from RabbitMQ.
        persist: Restore the message on RabbitMQ reboot.
        reply_to: Reply message routing key to send with (always sending to default exchange).
        rpc: Whether to wait for reply in blocking mode.
        rpc_timeout: RPC reply waiting time.
        raise_timeout: Whether to raise `TimeoutError` or return `None` at **rpc_timeout**.
        correlation_id: Manual message **correlation_id** setter.
        **correlation_id** is a useful option to trace messages.
        headers: Message headers to store metainformation.
        content_type: Message **content-type** header.
        Used by application, not core RabbitMQ. Will be set automatically if not specified.
        content_encoding: Message body content encoding, e.g. **gzip**.
        expiration: Message expiration (lifetime) in seconds (or datetime or timedelta).
        message_id: Arbitrary message id. Generated automatically if not presented.
        timestamp: Message publish timestamp. Generated automatically if not presented.
        message_type: Application-specific message type, e.g. **orders.created**.
        user_id: Publisher connection User ID, validated if set.
        priority: The message priority (0 by default).
    """
    routing = routing_key or RabbitQueue.validate(queue).routing
    correlation_id = correlation_id or gen_cor_id()

    return await super().publish(
        message,
        producer=self._producer,
        routing_key=routing,
        app_id=self.app_id,
        exchange=exchange,
        mandatory=mandatory,
        immediate=immediate,
        persist=persist,
        reply_to=reply_to,
        headers=headers,
        correlation_id=correlation_id,
        content_type=content_type,
        content_encoding=content_encoding,
        expiration=expiration,
        message_id=message_id,
        timestamp=timestamp,
        message_type=message_type,
        user_id=user_id,
        timeout=timeout,
        priority=priority,
        rpc=rpc,
        rpc_timeout=rpc_timeout,
        raise_timeout=raise_timeout,
    )

request async #

request(
    message=None,
    queue="",
    exchange=None,
    *,
    routing_key="",
    mandatory=True,
    immediate=False,
    timeout=None,
    persist=False,
    correlation_id=None,
    headers=None,
    content_type=None,
    content_encoding=None,
    expiration=None,
    message_id=None,
    timestamp=None,
    message_type=None,
    user_id=None,
    priority=None,
)

Make a synchronous request to RabbitMQ.

This method uses Direct Reply-To pattern to send a message and wait for a reply. It is a blocking call and will wait for a reply until timeout.

PARAMETER DESCRIPTION
message

Message body to send.

TYPE: AioPikaSendableMessage DEFAULT: None

queue

Message routing key to publish with.

TYPE: Union[RabbitQueue, str] DEFAULT: ''

exchange

Target exchange to publish message to.

TYPE: Union[RabbitExchange, str, None] DEFAULT: None

routing_key

Message routing key to publish with. Overrides queue option if presented.

TYPE: str DEFAULT: ''

mandatory

Client waits for confirmation that the message is placed to some queue.

TYPE: bool DEFAULT: True

immediate

Client expects that there is a consumer ready to take the message to work.

TYPE: bool DEFAULT: False

timeout

Send confirmation time from RabbitMQ.

TYPE: TimeoutType DEFAULT: None

persist

Restore the message on RabbitMQ reboot.

TYPE: bool DEFAULT: False

correlation_id

Manual message correlation_id setter. correlation_id is a useful option to trace messages.

TYPE: Optional[str] DEFAULT: None

headers

Message headers to store metainformation.

TYPE: Optional[HeadersType] DEFAULT: None

content_type

Message content-type header. Used by application, not core RabbitMQ.

TYPE: Optional[str] DEFAULT: None

content_encoding

Message body content encoding, e.g. gzip.

TYPE: Optional[str] DEFAULT: None

expiration

Message expiration (lifetime) in seconds (or datetime or timedelta).

TYPE: Optional[DateType] DEFAULT: None

message_id

Arbitrary message id. Generated automatically if not presented.

TYPE: Optional[str] DEFAULT: None

timestamp

Message publish timestamp. Generated automatically if not presented.

TYPE: Optional[DateType] DEFAULT: None

message_type

Application-specific message type, e.g. orders.created.

TYPE: Optional[str] DEFAULT: None

user_id

Publisher connection User ID, validated if set.

TYPE: Optional[str] DEFAULT: None

priority

The message priority (0 by default).

TYPE: Optional[int] DEFAULT: None

Source code in faststream/rabbit/broker/broker.py
@override
async def request(  # type: ignore[override]
    self,
    message: "AioPikaSendableMessage" = None,
    queue: Union["RabbitQueue", str] = "",
    exchange: Union["RabbitExchange", str, None] = None,
    *,
    routing_key: str = "",
    mandatory: bool = True,
    immediate: bool = False,
    timeout: "TimeoutType" = None,
    persist: bool = False,
    # message args
    correlation_id: Optional[str] = None,
    headers: Optional["HeadersType"] = None,
    content_type: Optional[str] = None,
    content_encoding: Optional[str] = None,
    expiration: Optional["DateType"] = None,
    message_id: Optional[str] = None,
    timestamp: Optional["DateType"] = None,
    message_type: Optional[str] = None,
    user_id: Optional[str] = None,
    priority: Optional[int] = None,
) -> "RabbitMessage":
    """Make a synchronous request to RabbitMQ.

    This method uses Direct Reply-To pattern to send a message and wait for a reply.
    It is a blocking call and will wait for a reply until timeout.

    Args:
        message: Message body to send.
        queue: Message routing key to publish with.
        exchange: Target exchange to publish message to.
        routing_key: Message routing key to publish with. Overrides `queue` option if presented.
        mandatory: Client waits for confirmation that the message is placed to some queue.
        RabbitMQ returns message to client if there is no suitable queue.
        immediate: Client expects that there is a consumer ready to take the message to work.
        RabbitMQ returns message to client if there is no suitable consumer.
        timeout: Send confirmation time from RabbitMQ.
        persist: Restore the message on RabbitMQ reboot.
        correlation_id: Manual message **correlation_id** setter. **correlation_id** is a useful option to trace messages.
        headers: Message headers to store metainformation.
        content_type: Message **content-type** header. Used by application, not core RabbitMQ.
        Will be set automatically if not specified.
        content_encoding: Message body content encoding, e.g. **gzip**.
        expiration: Message expiration (lifetime) in seconds (or datetime or timedelta).
        message_id: Arbitrary message id. Generated automatically if not presented.
        timestamp: Message publish timestamp. Generated automatically if not presented.
        message_type: Application-specific message type, e.g. **orders.created**.
        user_id: Publisher connection User ID, validated if set.
        priority: The message priority (0 by default).
    """
    routing = routing_key or RabbitQueue.validate(queue).routing
    correlation_id = correlation_id or gen_cor_id()

    msg: RabbitMessage = await super().request(
        message,
        producer=self._producer,
        correlation_id=correlation_id,
        routing_key=routing,
        app_id=self.app_id,
        exchange=exchange,
        mandatory=mandatory,
        immediate=immediate,
        persist=persist,
        headers=headers,
        content_type=content_type,
        content_encoding=content_encoding,
        expiration=expiration,
        message_id=message_id,
        timestamp=timestamp,
        message_type=message_type,
        user_id=user_id,
        timeout=timeout,
        priority=priority,
    )
    return msg

declare_queue async #

declare_queue(queue)

Declares queue object in RabbitMQ.

Source code in faststream/rabbit/broker/broker.py
async def declare_queue(
    self,
    queue: Annotated[
        "RabbitQueue",
        Doc("Queue object to create."),
    ],
) -> "RobustQueue":
    """Declares queue object in **RabbitMQ**."""
    assert self.declarer, NOT_CONNECTED_YET  # nosec B101
    return await self.declarer.declare_queue(queue)

declare_exchange async #

declare_exchange(exchange)

Declares exchange object in RabbitMQ.

Source code in faststream/rabbit/broker/broker.py
async def declare_exchange(
    self,
    exchange: Annotated[
        "RabbitExchange",
        Doc("Exchange object to create."),
    ],
) -> "RobustExchange":
    """Declares exchange object in **RabbitMQ**."""
    assert self.declarer, NOT_CONNECTED_YET  # nosec B101
    return await self.declarer.declare_exchange(exchange)

ping async #

ping(timeout)
Source code in faststream/rabbit/broker/broker.py
@override
async def ping(self, timeout: Optional[float]) -> bool:
    sleep_time = (timeout or 10) / 10

    with anyio.move_on_after(timeout) as cancel_scope:
        if self._connection is None:
            return False

        while True:
            if cancel_scope.cancel_called:
                return False

            if self._connection.connected.is_set():
                return True

            await anyio.sleep(sleep_time)

    return False