Skip to content

RabbitBroker

faststream.rabbit.RabbitBroker #

RabbitBroker(url='amqp://guest:guest@localhost:5672/', *, host=None, port=None, virtualhost=None, ssl_options=None, client_properties=None, timeout=None, fail_fast=True, reconnect_interval=5.0, channel_number=None, publisher_confirms=True, on_return_raises=False, max_consumers=None, app_id=SERVICE_NAME, graceful_timeout=None, decoder=None, parser=None, dependencies=(), middlewares=(), security=None, asyncapi_url=None, protocol=None, protocol_version='0.9.1', description=None, tags=None, logger=EMPTY, log_level=INFO, log_fmt=None, apply_types=True, validate=True, _get_dependant=None, _call_decorators=())

Bases: RabbitRegistrator, RabbitLoggingBroker

A class to represent a RabbitMQ broker.

Source code in faststream/rabbit/broker/broker.py
def __init__(
    self,
    url: Annotated[
        Union[str, "URL", None],
        Doc("RabbitMQ destination location to connect."),
    ] = "amqp://guest:guest@localhost:5672/",  # pragma: allowlist secret
    *,
    # connection args
    host: Annotated[
        Optional[str],
        Doc("Destination host. This option overrides `url` option host."),
    ] = None,
    port: Annotated[
        Optional[int],
        Doc("Destination port. This option overrides `url` option port."),
    ] = None,
    virtualhost: Annotated[
        Optional[str],
        Doc("RabbitMQ virtual host to use in the current broker connection."),
    ] = None,
    ssl_options: Annotated[
        Optional["SSLOptions"],
        Doc("Extra ssl options to establish connection."),
    ] = None,
    client_properties: Annotated[
        Optional["FieldTable"],
        Doc("Add custom client capability."),
    ] = None,
    timeout: Annotated[
        "TimeoutType",
        Doc("Connection establishement timeout."),
    ] = None,
    fail_fast: Annotated[
        bool,
        Doc(
            "Broker startup raises `AMQPConnectionError` if RabbitMQ is unreachable."
        ),
    ] = True,
    reconnect_interval: Annotated[
        "TimeoutType",
        Doc("Time to sleep between reconnection attempts."),
    ] = 5.0,
    # channel args
    channel_number: Annotated[
        Optional[int],
        Doc("Specify the channel number explicit."),
    ] = None,
    publisher_confirms: Annotated[
        bool,
        Doc(
            "if `True` the `publish` method will "
            "return `bool` type after publish is complete."
            "Otherwise it will returns `None`."
        ),
    ] = True,
    on_return_raises: Annotated[
        bool,
        Doc(
            "raise an :class:`aio_pika.exceptions.DeliveryError`"
            "when mandatory message will be returned"
        ),
    ] = False,
    # broker args
    max_consumers: Annotated[
        Optional[int],
        Doc(
            "RabbitMQ channel `qos` option. "
            "It limits max messages processing in the same time count."
        ),
    ] = None,
    app_id: Annotated[
        Optional[str],
        Doc("Application name to mark outgoing messages by."),
    ] = SERVICE_NAME,
    # broker base args
    graceful_timeout: Annotated[
        Optional[float],
        Doc(
            "Graceful shutdown timeout. Broker waits for all running subscribers completion before shut down."
        ),
    ] = None,
    decoder: Annotated[
        Optional["CustomCallable"],
        Doc("Custom decoder object."),
    ] = None,
    parser: Annotated[
        Optional["CustomCallable"],
        Doc("Custom parser object."),
    ] = None,
    dependencies: Annotated[
        Iterable["Depends"],
        Doc("Dependencies to apply to all broker subscribers."),
    ] = (),
    middlewares: Annotated[
        Iterable["BrokerMiddleware[IncomingMessage]"],
        Doc("Middlewares to apply to all broker publishers/subscribers."),
    ] = (),
    # AsyncAPI args
    security: Annotated[
        Optional["BaseSecurity"],
        Doc(
            "Security options to connect broker and generate AsyncAPI server security information."
        ),
    ] = None,
    asyncapi_url: Annotated[
        Optional[str],
        Doc("AsyncAPI hardcoded server addresses. Use `servers` if not specified."),
    ] = None,
    protocol: Annotated[
        Optional[str],
        Doc("AsyncAPI server protocol."),
    ] = None,
    protocol_version: Annotated[
        Optional[str],
        Doc("AsyncAPI server protocol version."),
    ] = "0.9.1",
    description: Annotated[
        Optional[str],
        Doc("AsyncAPI server description."),
    ] = None,
    tags: Annotated[
        Optional[Iterable[Union["asyncapi.Tag", "asyncapi.TagDict"]]],
        Doc("AsyncAPI server tags."),
    ] = None,
    # logging args
    logger: Annotated[
        Optional["LoggerProto"],
        Doc("User specified logger to pass into Context and log service messages."),
    ] = EMPTY,
    log_level: Annotated[
        int,
        Doc("Service messages log level."),
    ] = logging.INFO,
    log_fmt: Annotated[
        Optional[str],
        Doc("Default logger log format."),
    ] = None,
    # FastDepends args
    apply_types: Annotated[
        bool,
        Doc("Whether to use FastDepends or not."),
    ] = True,
    validate: Annotated[
        bool,
        Doc("Whether to cast types using Pydantic validation."),
    ] = True,
    _get_dependant: Annotated[
        Optional[Callable[..., Any]],
        Doc("Custom library dependant generator callback."),
    ] = None,
    _call_decorators: Annotated[
        Iterable["Decorator"],
        Doc("Any custom decorator to apply to wrapped functions."),
    ] = (),
) -> None:
    security_args = parse_security(security)

    amqp_url = build_url(
        url,
        host=host,
        port=port,
        virtualhost=virtualhost,
        ssl_options=ssl_options,
        client_properties=client_properties,
        login=security_args.get("login"),
        password=security_args.get("password"),
        ssl=security_args.get("ssl"),
    )

    if asyncapi_url is None:
        asyncapi_url = str(amqp_url)

    # respect ascynapi_url argument scheme
    builded_asyncapi_url = urlparse(asyncapi_url)
    self.virtual_host = builded_asyncapi_url.path
    if protocol is None:
        protocol = builded_asyncapi_url.scheme

    super().__init__(
        url=str(amqp_url),
        ssl_context=security_args.get("ssl_context"),
        timeout=timeout,
        fail_fast=fail_fast,
        reconnect_interval=reconnect_interval,
        # channel args
        channel_number=channel_number,
        publisher_confirms=publisher_confirms,
        on_return_raises=on_return_raises,
        # Basic args
        graceful_timeout=graceful_timeout,
        dependencies=dependencies,
        decoder=decoder,
        parser=parser,
        middlewares=middlewares,
        # AsyncAPI args
        description=description,
        asyncapi_url=asyncapi_url,
        protocol=protocol or builded_asyncapi_url.scheme,
        protocol_version=protocol_version,
        security=security,
        tags=tags,
        # Logging args
        logger=logger,
        log_level=log_level,
        log_fmt=log_fmt,
        # FastDepends args
        apply_types=apply_types,
        validate=validate,
        _get_dependant=_get_dependant,
        _call_decorators=_call_decorators,
    )

    self._max_consumers = max_consumers

    self.app_id = app_id

    self._channel = None
    self.declarer = None

prefix instance-attribute #

prefix = prefix

include_in_schema instance-attribute #

include_in_schema = include_in_schema

logger instance-attribute #

logger

use_custom instance-attribute #

use_custom = True

running instance-attribute #

running = False

graceful_timeout instance-attribute #

graceful_timeout = graceful_timeout

protocol instance-attribute #

protocol = protocol

protocol_version instance-attribute #

protocol_version = protocol_version

description instance-attribute #

description = description

tags instance-attribute #

tags = tags

security instance-attribute #

security = security

url instance-attribute #

url

declarer instance-attribute #

declarer = None

virtual_host instance-attribute #

virtual_host = path

app_id instance-attribute #

app_id = app_id

setup #

setup()

Prepare all Broker entities to startup.

Source code in faststream/broker/core/usecase.py
def setup(self) -> None:
    """Prepare all Broker entities to startup."""
    for h in self._subscribers.values():
        self.setup_subscriber(h)

    for p in self._publishers.values():
        self.setup_publisher(p)

add_middleware #

add_middleware(middleware)

Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of already existed ones.

Source code in faststream/broker/core/abc.py
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.

    Current middleware will be used as a most inner of already existed ones.
    """
    self._middlewares = (*self._middlewares, middleware)

    for sub in self._subscribers.values():
        sub.add_middleware(middleware)

    for pub in self._publishers.values():
        pub.add_middleware(middleware)

subscriber #

subscriber(queue, exchange=None, *, consume_args=None, reply_config=None, dependencies=(), parser=None, decoder=None, middlewares=(), filter=default_filter, retry=False, no_ack=False, no_reply=False, title=None, description=None, include_in_schema=True)
Source code in faststream/rabbit/broker/registrator.py
@override
def subscriber(  # type: ignore[override]
    self,
    queue: Annotated[
        Union[str, "RabbitQueue"],
        Doc(
            "RabbitMQ queue to listen. "
            "**FastStream** declares and binds queue object to `exchange` automatically if it is not passive (by default)."
        ),
    ],
    exchange: Annotated[
        Union[str, "RabbitExchange", None],
        Doc(
            "RabbitMQ exchange to bind queue to. "
            "Uses default exchange if not presented. "
            "**FastStream** declares exchange object automatically if it is not passive (by default)."
        ),
    ] = None,
    *,
    consume_args: Annotated[
        Optional["AnyDict"],
        Doc("Extra consumer arguments to use in `queue.consume(...)` method."),
    ] = None,
    reply_config: Annotated[
        Optional["ReplyConfig"],
        Doc("Extra options to use at replies publishing."),
        deprecated(
            "Deprecated in **FastStream 0.5.16**. "
            "Please, use `RabbitResponse` object as a handler return instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = None,
    # broker arguments
    dependencies: Annotated[
        Iterable["Depends"],
        Doc("Dependencies list (`[Depends(),]`) to apply to the subscriber."),
    ] = (),
    parser: Annotated[
        Optional["CustomCallable"],
        Doc("Parser to map original **IncomingMessage** Msg to FastStream one."),
    ] = None,
    decoder: Annotated[
        Optional["CustomCallable"],
        Doc("Function to decode FastStream msg bytes body to python objects."),
    ] = None,
    middlewares: Annotated[
        Iterable["SubscriberMiddleware[RabbitMessage]"],
        Doc("Subscriber middlewares to wrap incoming message processing."),
    ] = (),
    filter: Annotated[
        "Filter[RabbitMessage]",
        Doc(
            "Overload subscriber to consume various messages from the same source."
        ),
        deprecated(
            "Deprecated in **FastStream 0.5.0**. "
            "Please, create `subscriber` object and use it explicitly instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = default_filter,
    retry: Annotated[
        Union[bool, int],
        Doc("Whether to `nack` message at processing exception."),
    ] = False,
    no_ack: Annotated[
        bool,
        Doc("Whether to disable **FastStream** autoacknowledgement logic or not."),
    ] = False,
    no_reply: Annotated[
        bool,
        Doc(
            "Whether to disable **FastStream** RPC and Reply To auto responses or not."
        ),
    ] = False,
    # AsyncAPI information
    title: Annotated[
        Optional[str],
        Doc("AsyncAPI subscriber object title."),
    ] = None,
    description: Annotated[
        Optional[str],
        Doc(
            "AsyncAPI subscriber object description. "
            "Uses decorated docstring as default."
        ),
    ] = None,
    include_in_schema: Annotated[
        bool,
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = True,
) -> AsyncAPISubscriber:
    subscriber = cast(
        AsyncAPISubscriber,
        super().subscriber(
            create_subscriber(
                queue=RabbitQueue.validate(queue),
                exchange=RabbitExchange.validate(exchange),
                consume_args=consume_args,
                reply_config=reply_config,
                # subscriber args
                no_ack=no_ack,
                no_reply=no_reply,
                retry=retry,
                broker_middlewares=self._middlewares,
                broker_dependencies=self._dependencies,
                # AsyncAPI
                title_=title,
                description_=description,
                include_in_schema=self._solve_include_in_schema(include_in_schema),
            )
        ),
    )

    return subscriber.add_call(
        filter_=filter,
        parser_=parser or self._parser,
        decoder_=decoder or self._decoder,
        dependencies_=dependencies,
        middlewares_=middlewares,
    )

publisher #

publisher(queue='', exchange=None, *, routing_key='', mandatory=True, immediate=False, timeout=None, persist=False, reply_to=None, priority=None, middlewares=(), title=None, description=None, schema=None, include_in_schema=True, headers=None, content_type=None, content_encoding=None, expiration=None, message_type=None, user_id=None)

Creates long-living and AsyncAPI-documented publisher object.

You can use it as a handler decorator (handler should be decorated by @broker.subscriber(...) too) - @broker.publisher(...). In such case publisher will publish your handler return value.

Or you can create a publisher object to call it lately - broker.publisher(...).publish(...).

Source code in faststream/rabbit/broker/registrator.py
@override
def publisher(  # type: ignore[override]
    self,
    queue: Annotated[
        Union["RabbitQueue", str],
        Doc("Default message routing key to publish with."),
    ] = "",
    exchange: Annotated[
        Union["RabbitExchange", str, None],
        Doc("Target exchange to publish message to."),
    ] = None,
    *,
    routing_key: Annotated[
        str,
        Doc(
            "Default message routing key to publish with. "
            "Overrides `queue` option if presented."
        ),
    ] = "",
    mandatory: Annotated[
        bool,
        Doc(
            "Client waits for confirmation that the message is placed to some queue. "
            "RabbitMQ returns message to client if there is no suitable queue."
        ),
    ] = True,
    immediate: Annotated[
        bool,
        Doc(
            "Client expects that there is consumer ready to take the message to work. "
            "RabbitMQ returns message to client if there is no suitable consumer."
        ),
    ] = False,
    timeout: Annotated[
        "TimeoutType",
        Doc("Send confirmation time from RabbitMQ."),
    ] = None,
    persist: Annotated[
        bool,
        Doc("Restore the message on RabbitMQ reboot."),
    ] = False,
    reply_to: Annotated[
        Optional[str],
        Doc(
            "Reply message routing key to send with (always sending to default exchange)."
        ),
    ] = None,
    priority: Annotated[
        Optional[int],
        Doc("The message priority (0 by default)."),
    ] = None,
    # specific
    middlewares: Annotated[
        Iterable["PublisherMiddleware"],
        Doc("Publisher middlewares to wrap outgoing messages."),
    ] = (),
    # AsyncAPI information
    title: Annotated[
        Optional[str],
        Doc("AsyncAPI publisher object title."),
    ] = None,
    description: Annotated[
        Optional[str],
        Doc("AsyncAPI publisher object description."),
    ] = None,
    schema: Annotated[
        Optional[Any],
        Doc(
            "AsyncAPI publishing message type. "
            "Should be any python-native object annotation or `pydantic.BaseModel`."
        ),
    ] = None,
    include_in_schema: Annotated[
        bool,
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = True,
    # message args
    headers: Annotated[
        Optional["HeadersType"],
        Doc(
            "Message headers to store metainformation. "
            "Can be overridden by `publish.headers` if specified."
        ),
    ] = None,
    content_type: Annotated[
        Optional[str],
        Doc(
            "Message **content-type** header. "
            "Used by application, not core RabbitMQ. "
            "Will be set automatically if not specified."
        ),
    ] = None,
    content_encoding: Annotated[
        Optional[str],
        Doc("Message body content encoding, e.g. **gzip**."),
    ] = None,
    expiration: Annotated[
        Optional["DateType"],
        Doc("Message expiration (lifetime) in seconds (or datetime or timedelta)."),
    ] = None,
    message_type: Annotated[
        Optional[str],
        Doc("Application-specific message type, e.g. **orders.created**."),
    ] = None,
    user_id: Annotated[
        Optional[str],
        Doc("Publisher connection User ID, validated if set."),
    ] = None,
) -> AsyncAPIPublisher:
    """Creates long-living and AsyncAPI-documented publisher object.

    You can use it as a handler decorator (handler should be decorated by `@broker.subscriber(...)` too) - `@broker.publisher(...)`.
    In such case publisher will publish your handler return value.

    Or you can create a publisher object to call it lately - `broker.publisher(...).publish(...)`.
    """
    message_kwargs = PublishKwargs(
        mandatory=mandatory,
        immediate=immediate,
        timeout=timeout,
        persist=persist,
        reply_to=reply_to,
        headers=headers,
        priority=priority,
        content_type=content_type,
        content_encoding=content_encoding,
        message_type=message_type,
        user_id=user_id,
        expiration=expiration,
    )

    publisher = cast(
        AsyncAPIPublisher,
        super().publisher(
            AsyncAPIPublisher.create(
                routing_key=routing_key,
                queue=RabbitQueue.validate(queue),
                exchange=RabbitExchange.validate(exchange),
                message_kwargs=message_kwargs,
                # Specific
                broker_middlewares=self._middlewares,
                middlewares=middlewares,
                # AsyncAPI
                title_=title,
                description_=description,
                schema_=schema,
                include_in_schema=self._solve_include_in_schema(include_in_schema),
            )
        ),
    )

    return publisher

include_router #

include_router(router, *, prefix='', dependencies=(), middlewares=(), include_in_schema=None)

Includes a router in the current object.

Source code in faststream/broker/core/abc.py
def include_router(
    self,
    router: "ABCBroker[Any]",
    *,
    prefix: str = "",
    dependencies: Iterable["Depends"] = (),
    middlewares: Iterable["BrokerMiddleware[MsgType]"] = (),
    include_in_schema: Optional[bool] = None,
) -> None:
    """Includes a router in the current object."""
    for h in router._subscribers.values():
        h.add_prefix("".join((self.prefix, prefix)))

        if (key := hash(h)) not in self._subscribers:
            if include_in_schema is None:
                h.include_in_schema = self._solve_include_in_schema(
                    h.include_in_schema
                )
            else:
                h.include_in_schema = include_in_schema

            h._broker_middlewares = (
                *self._middlewares,
                *middlewares,
                *h._broker_middlewares,
            )
            h._broker_dependencies = (
                *self._dependencies,
                *dependencies,
                *h._broker_dependencies,
            )
            self._subscribers = {**self._subscribers, key: h}

    for p in router._publishers.values():
        p.add_prefix(self.prefix)

        if (key := hash(p)) not in self._publishers:
            if include_in_schema is None:
                p.include_in_schema = self._solve_include_in_schema(
                    p.include_in_schema
                )
            else:
                p.include_in_schema = include_in_schema

            p._broker_middlewares = (
                *self._middlewares,
                *middlewares,
                *p._broker_middlewares,
            )
            self._publishers = {**self._publishers, key: p}

include_routers #

include_routers(*routers)

Includes routers in the object.

Source code in faststream/broker/core/abc.py
def include_routers(
    self,
    *routers: "ABCBroker[MsgType]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)

get_fmt #

get_fmt()
Source code in faststream/rabbit/broker/logging.py
def get_fmt(self) -> str:
    return (
        "%(asctime)s %(levelname)-8s - "
        f"%(exchange)-{self._max_exchange_len}s | "
        f"%(queue)-{self._max_queue_len}s | "
        f"%(message_id)-{self.__max_msg_id_ln}s "
        "- %(message)s"
    )

setup_subscriber #

setup_subscriber(subscriber, **kwargs)

Setup the Subscriber to prepare it to starting.

Source code in faststream/broker/core/usecase.py
def setup_subscriber(
    self,
    subscriber: SubscriberProto[MsgType],
    **kwargs: Any,
) -> None:
    """Setup the Subscriber to prepare it to starting."""
    data = self._subscriber_setup_extra.copy()
    data.update(kwargs)
    subscriber.setup(**data)

setup_publisher #

setup_publisher(publisher, **kwargs)

Setup the Publisher to prepare it to starting.

Source code in faststream/broker/core/usecase.py
def setup_publisher(
    self,
    publisher: "PublisherProto[MsgType]",
    **kwargs: Any,
) -> None:
    """Setup the Publisher to prepare it to starting."""
    data = self._publisher_setup_extra.copy()
    data.update(kwargs)
    publisher.setup(**data)

close async #

close(exc_type=None, exc_val=None, exc_tb=None)

Closes the object.

Source code in faststream/broker/core/usecase.py
async def close(
    self,
    exc_type: Optional[Type[BaseException]] = None,
    exc_val: Optional[BaseException] = None,
    exc_tb: Optional["TracebackType"] = None,
) -> None:
    """Closes the object."""
    self.running = False

    for h in self._subscribers.values():
        await h.close()

    if self._connection is not None:
        await self._close(exc_type, exc_val, exc_tb)

connect async #

connect(url=EMPTY, *, host=None, port=None, virtualhost=None, ssl_options=None, client_properties=None, security=None, timeout=None, fail_fast=EMPTY, reconnect_interval=EMPTY, channel_number=EMPTY, publisher_confirms=EMPTY, on_return_raises=EMPTY)

Connect broker object to RabbitMQ.

To startup subscribers too you should use broker.start() after/instead this method.

Source code in faststream/rabbit/broker/broker.py
@override
async def connect(  # type: ignore[override]
    self,
    url: Annotated[
        Union[str, "URL", None],
        Doc("RabbitMQ destination location to connect."),
    ] = EMPTY,
    *,
    host: Annotated[
        Optional[str],
        Doc("Destination host. This option overrides `url` option host."),
    ] = None,
    port: Annotated[
        Optional[int],
        Doc("Destination port. This option overrides `url` option port."),
    ] = None,
    virtualhost: Annotated[
        Optional[str],
        Doc("RabbitMQ virtual host to use in the current broker connection."),
    ] = None,
    ssl_options: Annotated[
        Optional["SSLOptions"],
        Doc("Extra ssl options to establish connection."),
    ] = None,
    client_properties: Annotated[
        Optional["FieldTable"],
        Doc("Add custom client capability."),
    ] = None,
    security: Annotated[
        Optional["BaseSecurity"],
        Doc(
            "Security options to connect broker and generate AsyncAPI server security information."
        ),
    ] = None,
    timeout: Annotated[
        "TimeoutType",
        Doc("Connection establishement timeout."),
    ] = None,
    fail_fast: Annotated[
        bool,
        Doc(
            "Broker startup raises `AMQPConnectionError` if RabbitMQ is unreachable."
        ),
    ] = EMPTY,
    reconnect_interval: Annotated[
        "TimeoutType",
        Doc("Time to sleep between reconnection attempts."),
    ] = EMPTY,
    # channel args
    channel_number: Annotated[
        Optional[int],
        Doc("Specify the channel number explicit."),
    ] = EMPTY,
    publisher_confirms: Annotated[
        bool,
        Doc(
            "if `True` the `publish` method will "
            "return `bool` type after publish is complete."
            "Otherwise it will returns `None`."
        ),
    ] = EMPTY,
    on_return_raises: Annotated[
        bool,
        Doc(
            "raise an :class:`aio_pika.exceptions.DeliveryError`"
            "when mandatory message will be returned"
        ),
    ] = EMPTY,
) -> "RobustConnection":
    """Connect broker object to RabbitMQ.

    To startup subscribers too you should use `broker.start()` after/instead this method.
    """
    kwargs: AnyDict = {}

    if channel_number is not EMPTY:
        kwargs["channel_number"] = channel_number

    if publisher_confirms is not EMPTY:
        kwargs["publisher_confirms"] = publisher_confirms

    if on_return_raises is not EMPTY:
        kwargs["on_return_raises"] = on_return_raises

    if timeout:
        kwargs["timeout"] = timeout

    if fail_fast is not EMPTY:
        kwargs["fail_fast"] = fail_fast

    if reconnect_interval is not EMPTY:
        kwargs["reconnect_interval"] = reconnect_interval

    url = None if url is EMPTY else url

    if url or any(
        (host, port, virtualhost, ssl_options, client_properties, security)
    ):
        security_args = parse_security(security)

        kwargs["url"] = build_url(
            url,
            host=host,
            port=port,
            virtualhost=virtualhost,
            ssl_options=ssl_options,
            client_properties=client_properties,
            login=security_args.get("login"),
            password=security_args.get("password"),
            ssl=security_args.get("ssl"),
        )

        if ssl_context := security_args.get("ssl_context"):
            kwargs["ssl_context"] = ssl_context

    connection = await super().connect(**kwargs)

    return connection

start async #

start()

Connect broker to RabbitMQ and startup all subscribers.

Source code in faststream/rabbit/broker/broker.py
async def start(self) -> None:
    """Connect broker to RabbitMQ and startup all subscribers."""
    await super().start()

    assert self.declarer, NOT_CONNECTED_YET  # nosec B101

    for publisher in self._publishers.values():
        if publisher.exchange is not None:
            await self.declare_exchange(publisher.exchange)

    for subscriber in self._subscribers.values():
        self._log(
            f"`{subscriber.call_name}` waiting for messages",
            extra=subscriber.get_log_context(None),
        )
        await subscriber.start()

publish async #

publish(message=None, queue='', exchange=None, *, routing_key='', mandatory=True, immediate=False, timeout=None, persist=False, reply_to=None, rpc=False, rpc_timeout=30.0, raise_timeout=False, correlation_id=None, headers=None, content_type=None, content_encoding=None, expiration=None, message_id=None, timestamp=None, message_type=None, user_id=None, priority=None)

Publish message directly.

This method allows you to publish message in not AsyncAPI-documented way. You can use it in another frameworks applications or to publish messages from time to time.

Please, use @broker.publisher(...) or broker.publisher(...).publish(...) instead in a regular way.

Source code in faststream/rabbit/broker/broker.py
@override
async def publish(  # type: ignore[override]
    self,
    message: Annotated[
        "AioPikaSendableMessage",
        Doc("Message body to send."),
    ] = None,
    queue: Annotated[
        Union["RabbitQueue", str],
        Doc("Message routing key to publish with."),
    ] = "",
    exchange: Annotated[
        Union["RabbitExchange", str, None],
        Doc("Target exchange to publish message to."),
    ] = None,
    *,
    routing_key: Annotated[
        str,
        Doc(
            "Message routing key to publish with. "
            "Overrides `queue` option if presented."
        ),
    ] = "",
    mandatory: Annotated[
        bool,
        Doc(
            "Client waits for confirmation that the message is placed to some queue. "
            "RabbitMQ returns message to client if there is no suitable queue."
        ),
    ] = True,
    immediate: Annotated[
        bool,
        Doc(
            "Client expects that there is consumer ready to take the message to work. "
            "RabbitMQ returns message to client if there is no suitable consumer."
        ),
    ] = False,
    timeout: Annotated[
        "TimeoutType",
        Doc("Send confirmation time from RabbitMQ."),
    ] = None,
    persist: Annotated[
        bool,
        Doc("Restore the message on RabbitMQ reboot."),
    ] = False,
    reply_to: Annotated[
        Optional[str],
        Doc(
            "Reply message routing key to send with (always sending to default exchange)."
        ),
    ] = None,
    rpc: Annotated[
        bool,
        Doc("Whether to wait for reply in blocking mode."),
        deprecated(
            "Deprecated in **FastStream 0.5.17**. "
            "Please, use `request` method instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = False,
    rpc_timeout: Annotated[
        Optional[float],
        Doc("RPC reply waiting time."),
        deprecated(
            "Deprecated in **FastStream 0.5.17**. "
            "Please, use `request` method with `timeout` instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = 30.0,
    raise_timeout: Annotated[
        bool,
        Doc(
            "Whetever to raise `TimeoutError` or return `None` at **rpc_timeout**. "
            "RPC request returns `None` at timeout by default."
        ),
        deprecated(
            "Deprecated in **FastStream 0.5.17**. "
            "`request` always raises TimeoutError instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = False,
    # message args
    correlation_id: Annotated[
        Optional[str],
        Doc(
            "Manual message **correlation_id** setter. "
            "**correlation_id** is a useful option to trace messages."
        ),
    ] = None,
    headers: Annotated[
        Optional["HeadersType"],
        Doc("Message headers to store metainformation."),
    ] = None,
    content_type: Annotated[
        Optional[str],
        Doc(
            "Message **content-type** header. "
            "Used by application, not core RabbitMQ. "
            "Will be set automatically if not specified."
        ),
    ] = None,
    content_encoding: Annotated[
        Optional[str],
        Doc("Message body content encoding, e.g. **gzip**."),
    ] = None,
    expiration: Annotated[
        Optional["DateType"],
        Doc("Message expiration (lifetime) in seconds (or datetime or timedelta)."),
    ] = None,
    message_id: Annotated[
        Optional[str],
        Doc("Arbitrary message id. Generated automatically if not presented."),
    ] = None,
    timestamp: Annotated[
        Optional["DateType"],
        Doc("Message publish timestamp. Generated automatically if not presented."),
    ] = None,
    message_type: Annotated[
        Optional[str],
        Doc("Application-specific message type, e.g. **orders.created**."),
    ] = None,
    user_id: Annotated[
        Optional[str],
        Doc("Publisher connection User ID, validated if set."),
    ] = None,
    priority: Annotated[
        Optional[int],
        Doc("The message priority (0 by default)."),
    ] = None,
) -> Optional[Any]:
    """Publish message directly.

    This method allows you to publish message in not AsyncAPI-documented way. You can use it in another frameworks
    applications or to publish messages from time to time.

    Please, use `@broker.publisher(...)` or `broker.publisher(...).publish(...)` instead in a regular way.
    """
    routing = routing_key or RabbitQueue.validate(queue).routing
    correlation_id = correlation_id or gen_cor_id()

    return await super().publish(
        message,
        producer=self._producer,
        routing_key=routing,
        app_id=self.app_id,
        exchange=exchange,
        mandatory=mandatory,
        immediate=immediate,
        persist=persist,
        reply_to=reply_to,
        headers=headers,
        correlation_id=correlation_id,
        content_type=content_type,
        content_encoding=content_encoding,
        expiration=expiration,
        message_id=message_id,
        timestamp=timestamp,
        message_type=message_type,
        user_id=user_id,
        timeout=timeout,
        priority=priority,
        rpc=rpc,
        rpc_timeout=rpc_timeout,
        raise_timeout=raise_timeout,
    )

request async #

request(message=None, queue='', exchange=None, *, routing_key='', mandatory=True, immediate=False, timeout=None, persist=False, correlation_id=None, headers=None, content_type=None, content_encoding=None, expiration=None, message_id=None, timestamp=None, message_type=None, user_id=None, priority=None)
Source code in faststream/rabbit/broker/broker.py
@override
async def request(  # type: ignore[override]
    self,
    message: Annotated[
        "AioPikaSendableMessage",
        Doc("Message body to send."),
    ] = None,
    queue: Annotated[
        Union["RabbitQueue", str],
        Doc("Message routing key to publish with."),
    ] = "",
    exchange: Annotated[
        Union["RabbitExchange", str, None],
        Doc("Target exchange to publish message to."),
    ] = None,
    *,
    routing_key: Annotated[
        str,
        Doc(
            "Message routing key to publish with. "
            "Overrides `queue` option if presented."
        ),
    ] = "",
    mandatory: Annotated[
        bool,
        Doc(
            "Client waits for confirmation that the message is placed to some queue. "
            "RabbitMQ returns message to client if there is no suitable queue."
        ),
    ] = True,
    immediate: Annotated[
        bool,
        Doc(
            "Client expects that there is consumer ready to take the message to work. "
            "RabbitMQ returns message to client if there is no suitable consumer."
        ),
    ] = False,
    timeout: Annotated[
        "TimeoutType",
        Doc("Send confirmation time from RabbitMQ."),
    ] = None,
    persist: Annotated[
        bool,
        Doc("Restore the message on RabbitMQ reboot."),
    ] = False,
    # message args
    correlation_id: Annotated[
        Optional[str],
        Doc(
            "Manual message **correlation_id** setter. "
            "**correlation_id** is a useful option to trace messages."
        ),
    ] = None,
    headers: Annotated[
        Optional["HeadersType"],
        Doc("Message headers to store metainformation."),
    ] = None,
    content_type: Annotated[
        Optional[str],
        Doc(
            "Message **content-type** header. "
            "Used by application, not core RabbitMQ. "
            "Will be set automatically if not specified."
        ),
    ] = None,
    content_encoding: Annotated[
        Optional[str],
        Doc("Message body content encoding, e.g. **gzip**."),
    ] = None,
    expiration: Annotated[
        Optional["DateType"],
        Doc("Message expiration (lifetime) in seconds (or datetime or timedelta)."),
    ] = None,
    message_id: Annotated[
        Optional[str],
        Doc("Arbitrary message id. Generated automatically if not presented."),
    ] = None,
    timestamp: Annotated[
        Optional["DateType"],
        Doc("Message publish timestamp. Generated automatically if not presented."),
    ] = None,
    message_type: Annotated[
        Optional[str],
        Doc("Application-specific message type, e.g. **orders.created**."),
    ] = None,
    user_id: Annotated[
        Optional[str],
        Doc("Publisher connection User ID, validated if set."),
    ] = None,
    priority: Annotated[
        Optional[int],
        Doc("The message priority (0 by default)."),
    ] = None,
) -> "RabbitMessage":
    routing = routing_key or RabbitQueue.validate(queue).routing
    correlation_id = correlation_id or gen_cor_id()

    msg: RabbitMessage = await super().request(
        message,
        producer=self._producer,
        correlation_id=correlation_id,
        routing_key=routing,
        app_id=self.app_id,
        exchange=exchange,
        mandatory=mandatory,
        immediate=immediate,
        persist=persist,
        headers=headers,
        content_type=content_type,
        content_encoding=content_encoding,
        expiration=expiration,
        message_id=message_id,
        timestamp=timestamp,
        message_type=message_type,
        user_id=user_id,
        timeout=timeout,
        priority=priority,
    )
    return msg

declare_queue async #

declare_queue(queue)

Declares queue object in RabbitMQ.

Source code in faststream/rabbit/broker/broker.py
async def declare_queue(
    self,
    queue: Annotated[
        "RabbitQueue",
        Doc("Queue object to create."),
    ],
) -> "RobustQueue":
    """Declares queue object in **RabbitMQ**."""
    assert self.declarer, NOT_CONNECTED_YET  # nosec B101
    return await self.declarer.declare_queue(queue)

declare_exchange async #

declare_exchange(exchange)

Declares exchange object in RabbitMQ.

Source code in faststream/rabbit/broker/broker.py
async def declare_exchange(
    self,
    exchange: Annotated[
        "RabbitExchange",
        Doc("Exchange object to create."),
    ],
) -> "RobustExchange":
    """Declares exchange object in **RabbitMQ**."""
    assert self.declarer, NOT_CONNECTED_YET  # nosec B101
    return await self.declarer.declare_exchange(exchange)

ping async #

ping(timeout)
Source code in faststream/rabbit/broker/broker.py
@override
async def ping(self, timeout: Optional[float]) -> bool:
    sleep_time = (timeout or 10) / 10

    with anyio.move_on_after(timeout) as cancel_scope:
        if self._connection is None:
            return False

        while True:
            if cancel_scope.cancel_called:
                return False

            if not self._connection.is_closed:
                return True

            await anyio.sleep(sleep_time)

    return False