Skip to content

FakeProducer

faststream.nats.test.FakeProducer #

FakeProducer(broker: NatsBroker)

Bases: NatsFastProducer

Source code in faststream/nats/test.py
def __init__(self, broker: NatsBroker):
    self.broker = broker

broker instance-attribute #

broker = broker

publish async #

publish(
    message: SendableMessage,
    subject: str,
    reply_to: str = "",
    headers: Optional[Dict[str, str]] = None,
    stream: Optional[str] = None,
    correlation_id: Optional[str] = None,
    *,
    rpc: bool = False,
    rpc_timeout: Optional[float] = None,
    raise_timeout: bool = False
) -> Optional[SendableMessage]
Source code in faststream/nats/test.py
@override
async def publish(  # type: ignore[override]
    self,
    message: SendableMessage,
    subject: str,
    reply_to: str = "",
    headers: Optional[Dict[str, str]] = None,
    stream: Optional[str] = None,
    correlation_id: Optional[str] = None,
    *,
    rpc: bool = False,
    rpc_timeout: Optional[float] = None,
    raise_timeout: bool = False,
) -> Optional[SendableMessage]:
    incoming = build_message(
        message=message,
        subject=subject,
        headers=headers,
        correlation_id=correlation_id,
        reply_to=reply_to,
    )

    for handler in self.broker.handlers.values():  # pragma: no branch
        call = False

        if subject == handler.subject:
            call = True

        else:
            call = True

            for current, base in zip_longest(
                subject.split("."),
                handler.subject.split("."),
                fillvalue=None,
            ):
                if base == ">":
                    break

                if base != "*" and current != base:
                    call = False
                    break

        if call:
            r = await call_handler(
                handler=handler,
                message=incoming,
                rpc=rpc,
                rpc_timeout=rpc_timeout,
                raise_timeout=raise_timeout,
            )

            if rpc:  # pragma: no branch
                return r

    return None

Last update: 2023-11-13