Skip to content

Publisher

faststream.rabbit.asyncapi.Publisher #

Bases: LogicPublisher

A class representing a publisher.

METHOD DESCRIPTION
get_payloads

Get the payloads for the publisher

calls class-attribute instance-attribute #

calls: List[Callable[..., Any]] = field(
    init=False, default_factory=list, repr=False
)

description property #

description: Optional[str]

exchange class-attribute instance-attribute #

exchange: Optional[RabbitExchange] = field(default=None)

immediate class-attribute instance-attribute #

immediate: bool = False

include_in_schema class-attribute instance-attribute #

include_in_schema: bool = field(default=True)

mandatory class-attribute instance-attribute #

mandatory: bool = True

message_kwargs class-attribute instance-attribute #

message_kwargs: AnyDict = field(default_factory=dict)

mock class-attribute instance-attribute #

mock: Optional[MagicMock] = field(
    init=False, default=None, repr=False
)

name property #

name: str

persist class-attribute instance-attribute #

persist: bool = False

priority class-attribute instance-attribute #

priority: Optional[int] = None

queue class-attribute instance-attribute #

queue: RabbitQueue = field(default=RabbitQueue(''))

reply_to class-attribute instance-attribute #

reply_to: Optional[str] = None

routing property #

routing: str | None

routing_key class-attribute instance-attribute #

routing_key: str = ''

timeout class-attribute instance-attribute #

timeout: TimeoutType = None

title class-attribute instance-attribute #

title: Optional[str] = field(default=None)

virtual_host class-attribute instance-attribute #

virtual_host: str = '/'

get_payloads #

get_payloads() -> List[Tuple[AnyDict, str]]
Source code in faststream/broker/publisher.py
def get_payloads(self) -> List[Tuple[AnyDict, str]]:
    payloads: List[Tuple[AnyDict, str]] = []

    if self._schema:
        call_model: CallModel[Any, Any] = CallModel(
            call=lambda: None,
            model=create_model("Fake"),
            response_model=create_model(
                "",
                __config__=get_config_base(),  # type: ignore[arg-type]
                response__=(self._schema, ...),
            ),
        )

        body = get_response_schema(
            call_model,
            prefix=f"{self.name}:Message",
        )
        if body:  # pragma: no branch
            payloads.append((body, ""))

    else:
        for call in self.calls:
            call_model = build_call_model(call)
            body = get_response_schema(
                call_model,
                prefix=f"{self.name}:Message",
            )
            if body:
                payloads.append((body, to_camelcase(unwrap(call).__name__)))

    return payloads

publish async #

publish(
    message: AioPikaSendableMessage = "",
    *,
    rpc: bool = False,
    rpc_timeout: float | None = 30.0,
    raise_timeout: bool = False,
    correlation_id: str | None = None,
    priority: int | None = None,
    **message_kwargs: Any
) -> ConfirmationFrameType | SendableMessage

Publish a message.

PARAMETER DESCRIPTION
message

The message to be published.

TYPE: AioPikaSendableMessage DEFAULT: ''

rpc

Whether the message is for RPC (Remote Procedure Call).

TYPE: bool DEFAULT: False

rpc_timeout

Timeout for RPC.

TYPE: float | None DEFAULT: 30.0

raise_timeout

Whether to raise an exception if timeout occurs.

TYPE: bool DEFAULT: False

correlation_id

Correlation ID for the message.

TYPE: str | None DEFAULT: None

priority

Priority for the message.

TYPE: int | None DEFAULT: None

**message_kwargs

Additional keyword arguments for the message.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
ConfirmationFrameType | SendableMessage

ConfirmationFrameType or SendableMessage: The result of the publish operation.

RAISES DESCRIPTION
AssertionError

If _producer is not set up.

Source code in faststream/rabbit/publisher.py
@override
async def publish(  # type: ignore[override]
    self,
    message: AioPikaSendableMessage = "",
    *,
    rpc: bool = False,
    rpc_timeout: Optional[float] = 30.0,
    raise_timeout: bool = False,
    correlation_id: Optional[str] = None,
    priority: Optional[int] = None,
    **message_kwargs: Any,
) -> Union[aiormq.abc.ConfirmationFrameType, SendableMessage]:
    """Publish a message.

    Args:
        message: The message to be published.
        rpc: Whether the message is for RPC (Remote Procedure Call).
        rpc_timeout: Timeout for RPC.
        raise_timeout: Whether to raise an exception if timeout occurs.
        correlation_id: Correlation ID for the message.
        priority: Priority for the message.
        **message_kwargs: Additional keyword arguments for the message.

    Returns:
        ConfirmationFrameType or SendableMessage: The result of the publish operation.

    Raises:
        AssertionError: If `_producer` is not set up.

    """
    assert self._producer, NOT_CONNECTED_YET  # nosec B101
    return await self._producer.publish(
        message=message,
        exchange=self.exchange,
        routing_key=self.routing,
        mandatory=self.mandatory,
        immediate=self.immediate,
        timeout=self.timeout,
        rpc=rpc,
        rpc_timeout=rpc_timeout,
        raise_timeout=raise_timeout,
        persist=self.persist,
        reply_to=self.reply_to,
        correlation_id=correlation_id,
        priority=priority or self.priority,
        **self.message_kwargs,
        **message_kwargs,
    )

reset_test #

reset_test() -> None
Source code in faststream/broker/publisher.py
def reset_test(self) -> None:
    self._fake_handler = False
    self.mock = None

schema #

schema() -> Dict[str, Channel]
Source code in faststream/rabbit/asyncapi.py
def schema(self) -> Dict[str, Channel]:
    if not self.include_in_schema:
        return {}

    payloads = self.get_payloads()

    return {
        self.name: Channel(
            description=self.description,  # type: ignore[attr-defined]
            publish=Operation(
                bindings=OperationBinding(
                    amqp=amqp.OperationBinding(
                        cc=self.routing or None,
                        deliveryMode=2 if self.persist else 1,
                        mandatory=self.mandatory,
                        replyTo=self.reply_to,
                        priority=self.priority,
                    ),
                )
                if _is_exchange(self.exchange)
                else None,
                message=Message(
                    title=f"{self.name}:Message",
                    payload=resolve_payloads(
                        payloads,
                        "Publisher",
                        served_words=2 if self.title is None else 1,
                    ),
                    correlationId=CorrelationId(
                        location="$message.header#/correlation_id"
                    ),
                ),
            ),
            bindings=ChannelBinding(
                amqp=amqp.ChannelBinding(
                    **{
                        "is": "routingKey",  # type: ignore
                        "queue": amqp.Queue(
                            name=self.queue.name,
                            durable=self.queue.durable,
                            exclusive=self.queue.exclusive,
                            autoDelete=self.queue.auto_delete,
                            vhost=self.virtual_host,
                        )
                        if _is_exchange(self.exchange) and self.queue.name
                        else None,
                        "exchange": (
                            amqp.Exchange(type="default", vhost=self.virtual_host)
                            if self.exchange is None
                            else amqp.Exchange(
                                type=self.exchange.type,  # type: ignore
                                name=self.exchange.name,
                                durable=self.exchange.durable,
                                autoDelete=self.exchange.auto_delete,
                                vhost=self.virtual_host,
                            )
                        ),
                    }
                )
            ),
        )
    }

set_test #

set_test(mock: MagicMock, with_fake: bool) -> None
Source code in faststream/broker/publisher.py
def set_test(
    self,
    mock: MagicMock,
    with_fake: bool,
) -> None:
    self.mock = mock
    self._fake_handler = with_fake