Skip to content

FakeProducer

faststream.nats.testing.FakeProducer #

FakeProducer(broker)

Bases: NatsFastProducer

Source code in faststream/nats/testing.py
def __init__(self, broker: NatsBroker) -> None:
    self.broker = broker

    default = NatsParser(pattern="", no_ack=False)
    self._parser = resolve_custom_func(broker._parser, default.parse_message)
    self._decoder = resolve_custom_func(broker._decoder, default.decode_message)

broker instance-attribute #

broker = broker

publish async #

publish(message, subject, reply_to='', headers=None, correlation_id=None, timeout=None, stream=None, *, rpc=False, rpc_timeout=None, raise_timeout=False)
Source code in faststream/nats/testing.py
@override
async def publish(  # type: ignore[override]
    self,
    message: "SendableMessage",
    subject: str,
    reply_to: str = "",
    headers: Optional[Dict[str, str]] = None,
    correlation_id: Optional[str] = None,
    # NatsJSFastProducer compatibility
    timeout: Optional[float] = None,
    stream: Optional[str] = None,
    *,
    rpc: bool = False,
    rpc_timeout: Optional[float] = None,
    raise_timeout: bool = False,
) -> Any:
    if rpc and reply_to:
        raise WRONG_PUBLISH_ARGS

    incoming = build_message(
        message=message,
        subject=subject,
        headers=headers,
        correlation_id=correlation_id,
        reply_to=reply_to,
    )

    for handler in self.broker._subscribers.values():  # pragma: no branch
        if _is_handler_suitable(handler, subject, stream):
            msg: Union[List[PatchedMessage], PatchedMessage]

            if (pull := getattr(handler, "pull_sub", None)) and pull.batch:
                msg = [incoming]
            else:
                msg = incoming

            with timeout_scope(rpc_timeout, raise_timeout):
                response = await self._execute_handler(msg, subject, handler)
                if rpc:
                    return await self._decoder(await self._parser(response))

    return None

request async #

request(message, subject, *, correlation_id=None, headers=None, timeout=0.5, stream=None)
Source code in faststream/nats/testing.py
@override
async def request(  # type: ignore[override]
    self,
    message: "SendableMessage",
    subject: str,
    *,
    correlation_id: Optional[str] = None,
    headers: Optional[Dict[str, str]] = None,
    timeout: float = 0.5,
    # NatsJSFastProducer compatibility
    stream: Optional[str] = None,
) -> "PatchedMessage":
    incoming = build_message(
        message=message,
        subject=subject,
        headers=headers,
        correlation_id=correlation_id,
    )

    for handler in self.broker._subscribers.values():  # pragma: no branch
        if _is_handler_suitable(handler, subject, stream):
            msg: Union[List[PatchedMessage], PatchedMessage]

            if (pull := getattr(handler, "pull_sub", None)) and pull.batch:
                msg = [incoming]
            else:
                msg = incoming

            with anyio.fail_after(timeout):
                return await self._execute_handler(msg, subject, handler)

    raise SubscriberNotFound