Skip to content

FakeProducer

faststream.nats.test.FakeProducer #

FakeProducer(broker: NatsBroker)

Bases: NatsFastProducer

Source code in faststream/nats/test.py
def __init__(self, broker: NatsBroker) -> None:
    self.broker = broker

broker instance-attribute #

broker = broker

publish async #

publish(message: SendableMessage, subject: str, reply_to: str = '', headers: Optional[Dict[str, str]] = None, correlation_id: Optional[str] = None, timeout: Optional[float] = None, stream: Optional[str] = None, *, rpc: bool = False, rpc_timeout: Optional[float] = None, raise_timeout: bool = False) -> Optional[SendableMessage]
Source code in faststream/nats/test.py
@override
async def publish(  # type: ignore[override]
    self,
    message: SendableMessage,
    subject: str,
    reply_to: str = "",
    headers: Optional[Dict[str, str]] = None,
    correlation_id: Optional[str] = None,
    # NatsJSFastProducer compatibility
    timeout: Optional[float] = None,
    stream: Optional[str] = None,
    *,
    rpc: bool = False,
    rpc_timeout: Optional[float] = None,
    raise_timeout: bool = False,
) -> Optional[SendableMessage]:
    incoming = build_message(
        message=message,
        subject=subject,
        headers=headers,
        correlation_id=correlation_id,
        reply_to=reply_to,
    )

    for handler in self.broker.handlers.values():  # pragma: no branch
        call = False

        if stream and getattr(handler.stream, "name", None) != stream:
            continue

        if subject == handler.subject:
            call = True

        else:
            call = True

            for current, base in zip_longest(
                subject.split("."),
                handler.subject.split("."),
                fillvalue=None,
            ):
                if base == ">":
                    break

                if base != "*" and current != base:
                    call = False
                    break

        if call:
            r = await call_handler(
                handler=handler,
                message=[incoming]
                if getattr(handler.pull_sub, "batch", False)
                else incoming,
                rpc=rpc,
                rpc_timeout=rpc_timeout,
                raise_timeout=raise_timeout,
            )

            if rpc:
                return r

    return None