Skip to content

FakeProducer

faststream.rabbit.testing.FakeProducer #

FakeProducer(broker)

Bases: AioPikaFastProducer

A fake RabbitMQ producer for testing purposes.

This class extends AioPikaFastProducer and is used to simulate RabbitMQ message publishing during tests.

Source code in faststream/rabbit/testing.py
def __init__(self, broker: RabbitBroker) -> None:
    self.broker = broker

    default_parser = AioPikaParser()
    self._parser = resolve_custom_func(broker._parser, default_parser.parse_message)
    self._decoder = resolve_custom_func(
        broker._decoder, default_parser.decode_message
    )

declarer instance-attribute #

declarer = declarer

broker instance-attribute #

broker = broker

publish async #

publish(message, exchange=None, *, correlation_id='', routing_key='', mandatory=True, immediate=False, timeout=None, rpc=False, rpc_timeout=30.0, raise_timeout=False, persist=False, reply_to=None, headers=None, content_type=None, content_encoding=None, priority=None, expiration=None, message_id=None, timestamp=None, message_type=None, user_id=None, app_id=None)

Publish a message to a RabbitMQ queue or exchange.

Source code in faststream/rabbit/testing.py
@override
async def publish(  # type: ignore[override]
    self,
    message: "AioPikaSendableMessage",
    exchange: Union["RabbitExchange", str, None] = None,
    *,
    correlation_id: str = "",
    routing_key: str = "",
    mandatory: bool = True,
    immediate: bool = False,
    timeout: "TimeoutType" = None,
    rpc: bool = False,
    rpc_timeout: Optional[float] = 30.0,
    raise_timeout: bool = False,
    persist: bool = False,
    reply_to: Optional[str] = None,
    headers: Optional["HeadersType"] = None,
    content_type: Optional[str] = None,
    content_encoding: Optional[str] = None,
    priority: Optional[int] = None,
    expiration: Optional["DateType"] = None,
    message_id: Optional[str] = None,
    timestamp: Optional["DateType"] = None,
    message_type: Optional[str] = None,
    user_id: Optional[str] = None,
    app_id: Optional[str] = None,
) -> Optional[Any]:
    """Publish a message to a RabbitMQ queue or exchange."""
    exch = RabbitExchange.validate(exchange)

    if rpc and reply_to:
        raise WRONG_PUBLISH_ARGS

    incoming = build_message(
        message=message,
        exchange=exch,
        routing_key=routing_key,
        reply_to=reply_to,
        app_id=app_id,
        user_id=user_id,
        message_type=message_type,
        headers=headers,
        persist=persist,
        message_id=message_id,
        priority=priority,
        content_encoding=content_encoding,
        content_type=content_type,
        correlation_id=correlation_id,
        expiration=expiration,
        timestamp=timestamp,
    )

    for handler in self.broker._subscribers.values():  # pragma: no branch
        if _is_handler_suitable(
            handler, incoming.routing_key, incoming.headers, exch
        ):
            with timeout_scope(rpc_timeout, raise_timeout):
                response = await self._execute_handler(incoming, handler)
                if rpc:
                    return await self._decoder(await self._parser(response))

    return None

request async #

request(message='', exchange=None, *, correlation_id='', routing_key='', mandatory=True, immediate=False, timeout=None, persist=False, headers=None, content_type=None, content_encoding=None, priority=None, expiration=None, message_id=None, timestamp=None, message_type=None, user_id=None, app_id=None)

Publish a message to a RabbitMQ queue or exchange.

Source code in faststream/rabbit/testing.py
@override
async def request(  # type: ignore[override]
    self,
    message: "AioPikaSendableMessage" = "",
    exchange: Union["RabbitExchange", str, None] = None,
    *,
    correlation_id: str = "",
    routing_key: str = "",
    mandatory: bool = True,
    immediate: bool = False,
    timeout: Optional[float] = None,
    persist: bool = False,
    headers: Optional["HeadersType"] = None,
    content_type: Optional[str] = None,
    content_encoding: Optional[str] = None,
    priority: Optional[int] = None,
    expiration: Optional["DateType"] = None,
    message_id: Optional[str] = None,
    timestamp: Optional["DateType"] = None,
    message_type: Optional[str] = None,
    user_id: Optional[str] = None,
    app_id: Optional[str] = None,
) -> "PatchedMessage":
    """Publish a message to a RabbitMQ queue or exchange."""
    exch = RabbitExchange.validate(exchange)

    incoming = build_message(
        message=message,
        exchange=exch,
        routing_key=routing_key,
        app_id=app_id,
        user_id=user_id,
        message_type=message_type,
        headers=headers,
        persist=persist,
        message_id=message_id,
        priority=priority,
        content_encoding=content_encoding,
        content_type=content_type,
        correlation_id=correlation_id,
        expiration=expiration,
        timestamp=timestamp,
    )

    for handler in self.broker._subscribers.values():  # pragma: no branch
        if _is_handler_suitable(
            handler, incoming.routing_key, incoming.headers, exch
        ):
            with anyio.fail_after(timeout):
                return await self._execute_handler(incoming, handler)

    raise SubscriberNotFound