Skip to content

FakeProducer

faststream.nats.testing.FakeProducer #

FakeProducer(broker)

Bases: NatsFastProducer

Source code in faststream/nats/testing.py
def __init__(self, broker: NatsBroker) -> None:
    self.broker = broker

    default = NatsParser(pattern="", no_ack=False)
    self._parser = resolve_custom_func(broker._parser, default.parse_message)
    self._decoder = resolve_custom_func(broker._decoder, default.decode_message)

broker instance-attribute #

broker = broker

publish async #

publish(
    message,
    subject,
    reply_to="",
    headers=None,
    correlation_id=None,
    timeout=None,
    stream=None,
    *,
    rpc=False,
    rpc_timeout=None,
    raise_timeout=False,
)
Source code in faststream/nats/testing.py
@override
async def publish(  # type: ignore[override]
    self,
    message: "SendableMessage",
    subject: str,
    reply_to: str = "",
    headers: Optional[Dict[str, str]] = None,
    correlation_id: Optional[str] = None,
    # NatsJSFastProducer compatibility
    timeout: Optional[float] = None,
    stream: Optional[str] = None,
    *,
    rpc: bool = False,
    rpc_timeout: Optional[float] = None,
    raise_timeout: bool = False,
) -> Any:
    if rpc and reply_to:
        raise WRONG_PUBLISH_ARGS

    incoming = build_message(
        message=message,
        subject=subject,
        headers=headers,
        correlation_id=correlation_id,
        reply_to=reply_to,
    )

    for handler in self.broker._subscribers.values():  # pragma: no branch
        if _is_handler_suitable(handler, subject, stream):
            msg: Union[List[PatchedMessage], PatchedMessage]

            if (pull := getattr(handler, "pull_sub", None)) and pull.batch:
                msg = [incoming]
            else:
                msg = incoming

            with timeout_scope(rpc_timeout, raise_timeout):
                response = await self._execute_handler(msg, subject, handler)
                if rpc:
                    return await self._decoder(await self._parser(response))

    return None

request async #

request(
    message,
    subject,
    *,
    correlation_id=None,
    headers=None,
    timeout=0.5,
    stream=None,
)
Source code in faststream/nats/testing.py
@override
async def request(  # type: ignore[override]
    self,
    message: "SendableMessage",
    subject: str,
    *,
    correlation_id: Optional[str] = None,
    headers: Optional[Dict[str, str]] = None,
    timeout: float = 0.5,
    # NatsJSFastProducer compatibility
    stream: Optional[str] = None,
) -> "PatchedMessage":
    incoming = build_message(
        message=message,
        subject=subject,
        headers=headers,
        correlation_id=correlation_id,
    )

    for handler in self.broker._subscribers.values():  # pragma: no branch
        if _is_handler_suitable(handler, subject, stream):
            msg: Union[List[PatchedMessage], PatchedMessage]

            if (pull := getattr(handler, "pull_sub", None)) and pull.batch:
                msg = [incoming]
            else:
                msg = incoming

            with anyio.fail_after(timeout):
                return await self._execute_handler(msg, subject, handler)

    raise SubscriberNotFound