Skip to content

RabbitPublisher

faststream.rabbit.router.RabbitPublisher #

RabbitPublisher(queue='', exchange=None, *, routing_key='', mandatory=True, immediate=False, timeout=None, persist=False, reply_to=None, priority=None, middlewares=(), title=None, description=None, schema=None, include_in_schema=True, headers=None, content_type=None, content_encoding=None, expiration=None, message_type=None, user_id=None)

Bases: ArgsContainer

Delayed RabbitPublisher registration object.

Just a copy of RabbitRegistrator.publisher(...) arguments.

Source code in faststream/rabbit/router.py
def __init__(
    self,
    queue: Annotated[
        Union["RabbitQueue", str],
        Doc("Default message routing key to publish with."),
    ] = "",
    exchange: Annotated[
        Union["RabbitExchange", str, None],
        Doc("Target exchange to publish message to."),
    ] = None,
    *,
    routing_key: Annotated[
        str,
        Doc(
            "Default message routing key to publish with. "
            "Overrides `queue` option if presented."
        ),
    ] = "",
    mandatory: Annotated[
        bool,
        Doc(
            "Client waits for confirmation that the message is placed to some queue. "
            "RabbitMQ returns message to client if there is no suitable queue."
        ),
    ] = True,
    immediate: Annotated[
        bool,
        Doc(
            "Client expects that there is consumer ready to take the message to work. "
            "RabbitMQ returns message to client if there is no suitable consumer."
        ),
    ] = False,
    timeout: Annotated[
        "TimeoutType",
        Doc("Send confirmation time from RabbitMQ."),
    ] = None,
    persist: Annotated[
        bool,
        Doc("Restore the message on RabbitMQ reboot."),
    ] = False,
    reply_to: Annotated[
        Optional[str],
        Doc(
            "Reply message routing key to send with (always sending to default exchange)."
        ),
    ] = None,
    priority: Annotated[
        Optional[int],
        Doc("The message priority (0 by default)."),
    ] = None,
    # basic args
    middlewares: Annotated[
        Iterable["PublisherMiddleware"],
        Doc("Publisher middlewares to wrap outgoing messages."),
    ] = (),
    # AsyncAPI args
    title: Annotated[
        Optional[str],
        Doc("AsyncAPI publisher object title."),
    ] = None,
    description: Annotated[
        Optional[str],
        Doc("AsyncAPI publisher object description."),
    ] = None,
    schema: Annotated[
        Optional[Any],
        Doc(
            "AsyncAPI publishing message type. "
            "Should be any python-native object annotation or `pydantic.BaseModel`."
        ),
    ] = None,
    include_in_schema: Annotated[
        bool,
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = True,
    # message args
    headers: Annotated[
        Optional["HeadersType"],
        Doc(
            "Message headers to store metainformation. "
            "Can be overridden by `publish.headers` if specified."
        ),
    ] = None,
    content_type: Annotated[
        Optional[str],
        Doc(
            "Message **content-type** header. "
            "Used by application, not core RabbitMQ. "
            "Will be set automatically if not specified."
        ),
    ] = None,
    content_encoding: Annotated[
        Optional[str],
        Doc("Message body content encoding, e.g. **gzip**."),
    ] = None,
    expiration: Annotated[
        Optional["DateType"],
        Doc("Message expiration (lifetime) in seconds (or datetime or timedelta)."),
    ] = None,
    message_type: Annotated[
        Optional[str],
        Doc("Application-specific message type, e.g. **orders.created**."),
    ] = None,
    user_id: Annotated[
        Optional[str],
        Doc("Publisher connection User ID, validated if set."),
    ] = None,
) -> None:
    super().__init__(
        queue=queue,
        exchange=exchange,
        routing_key=routing_key,
        mandatory=mandatory,
        immediate=immediate,
        timeout=timeout,
        persist=persist,
        reply_to=reply_to,
        priority=priority,
        headers=headers,
        content_type=content_type,
        content_encoding=content_encoding,
        expiration=expiration,
        message_type=message_type,
        user_id=user_id,
        # basic args
        middlewares=middlewares,
        # AsyncAPI args
        title=title,
        description=description,
        schema=schema,
        include_in_schema=include_in_schema,
    )

args instance-attribute #

args = args

kwargs instance-attribute #

kwargs = kwargs