Skip to content

FakeProducer

faststream.rabbit.test.FakeProducer #

FakeProducer(broker: RabbitBroker)

Bases: AioPikaFastProducer

A fake RabbitMQ producer for testing purposes.

This class extends AioPikaFastProducer and is used to simulate RabbitMQ message publishing during tests.

Initialize a FakeProducer instance.

PARAMETER DESCRIPTION
broker

The RabbitBroker instance to be used for message publishing.

TYPE: RabbitBroker

Source code in faststream/rabbit/test.py
def __init__(self, broker: RabbitBroker) -> None:
    """Initialize a FakeProducer instance.

    Args:
        broker (RabbitBroker): The RabbitBroker instance to be used for message publishing.
    """
    self.broker = broker

broker instance-attribute #

broker = broker

declarer instance-attribute #

publish async #

publish(
    message: AioPikaSendableMessage = "",
    queue: Union[RabbitQueue, str] = "",
    exchange: Union[RabbitExchange, str, None] = None,
    *,
    routing_key: str = "",
    mandatory: bool = True,
    immediate: bool = False,
    timeout: TimeoutType = None,
    rpc: bool = False,
    rpc_timeout: Optional[float] = 30.0,
    raise_timeout: bool = False,
    persist: bool = False,
    reply_to: Optional[str] = None,
    **message_kwargs: Any
) -> Optional[SendableMessage]

Publish a message to a RabbitMQ queue or exchange.

PARAMETER DESCRIPTION
message

The message to be published.

TYPE: AioPikaSendableMessage DEFAULT: ''

queue

The target queue for the message.

TYPE: Union[RabbitQueue, str] DEFAULT: ''

exchange

The target exchange for the message.

TYPE: Union[RabbitExchange, str, None] DEFAULT: None

routing_key

The routing key for the message.

TYPE: str DEFAULT: ''

mandatory

Whether the message is mandatory.

TYPE: bool DEFAULT: True

immediate

Whether the message should be sent immediately.

TYPE: bool DEFAULT: False

timeout

The timeout for the message.

TYPE: TimeoutType DEFAULT: None

rpc

Whether the message is for RPC.

TYPE: bool DEFAULT: False

rpc_timeout

The RPC timeout.

TYPE: float DEFAULT: 30.0

raise_timeout

Whether to raise a timeout exception.

TYPE: bool DEFAULT: False

persist

Whether to persist the message.

TYPE: bool DEFAULT: False

reply_to

The reply-to address for RPC messages.

TYPE: str DEFAULT: None

**message_kwargs

Additional message properties and content.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Optional[SendableMessage]

Optional[SendableMessage]: The published message if successful, or None if not.

Source code in faststream/rabbit/test.py
async def publish(
    self,
    message: AioPikaSendableMessage = "",
    queue: Union[RabbitQueue, str] = "",
    exchange: Union[RabbitExchange, str, None] = None,
    *,
    routing_key: str = "",
    mandatory: bool = True,
    immediate: bool = False,
    timeout: TimeoutType = None,
    rpc: bool = False,
    rpc_timeout: Optional[float] = 30.0,
    raise_timeout: bool = False,
    persist: bool = False,
    reply_to: Optional[str] = None,
    **message_kwargs: Any,
) -> Optional[SendableMessage]:
    """Publish a message to a RabbitMQ queue or exchange.

    Args:
        message (AioPikaSendableMessage, optional): The message to be published.
        queue (Union[RabbitQueue, str], optional): The target queue for the message.
        exchange (Union[RabbitExchange, str, None], optional): The target exchange for the message.
        routing_key (str, optional): The routing key for the message.
        mandatory (bool, optional): Whether the message is mandatory.
        immediate (bool, optional): Whether the message should be sent immediately.
        timeout (TimeoutType, optional): The timeout for the message.
        rpc (bool, optional): Whether the message is for RPC.
        rpc_timeout (float, optional): The RPC timeout.
        raise_timeout (bool, optional): Whether to raise a timeout exception.
        persist (bool, optional): Whether to persist the message.
        reply_to (str, optional): The reply-to address for RPC messages.
        **message_kwargs (Any): Additional message properties and content.

    Returns:
        Optional[SendableMessage]: The published message if successful, or None if not.
    """
    exch = RabbitExchange.validate(exchange)

    incoming = build_message(
        message=message,
        queue=queue,
        exchange=exch,
        routing_key=routing_key,
        reply_to=reply_to,
        **message_kwargs,
    )

    for handler in self.broker.handlers.values():  # pragma: no branch
        if handler.exchange == exch:
            call: bool = False

            if (
                handler.exchange is None
                or handler.exchange.type == ExchangeType.DIRECT
            ):
                call = handler.queue.name == incoming.routing_key

            elif handler.exchange.type == ExchangeType.FANOUT:
                call = True

            elif handler.exchange.type == ExchangeType.TOPIC:
                call = True

                for current, base in zip_longest(
                    (incoming.routing_key or "").split("."),
                    handler.queue.routing.split("."),
                    fillvalue=None,
                ):
                    if base == "#":
                        break

                    if base != "*" and current != base:
                        call = False
                        break

            elif handler.exchange.type == ExchangeType.HEADERS:  # pramga: no branch
                queue_headers = (handler.queue.bind_arguments or {}).copy()
                msg_headers = incoming.headers

                if not queue_headers:
                    call = True

                else:
                    matcher = queue_headers.pop("x-match", "all")

                    full = True
                    none = True
                    for k, v in queue_headers.items():
                        if msg_headers.get(k) != v:
                            full = False
                        else:
                            none = False

                    if not none:
                        call = (matcher == "any") or full

            else:  # pragma: no cover
                raise AssertionError("unreachable")

            if call:
                r = await call_handler(
                    handler=handler,
                    message=incoming,
                    rpc=rpc,
                    rpc_timeout=rpc_timeout,
                    raise_timeout=raise_timeout,
                )

                if rpc:  # pragma: no branch
                    return r

    return None