Skip to content

NatsFastProducer

faststream.nats.publisher.producer.NatsFastProducer #

NatsFastProducer(*, connection, parser, decoder)

Bases: ProducerProto

A class to represent a NATS producer.

Source code in faststream/nats/publisher/producer.py
def __init__(
    self,
    *,
    connection: "Client",
    parser: Optional["CustomCallable"],
    decoder: Optional["CustomCallable"],
) -> None:
    self._connection = connection

    default = NatsParser(pattern="", no_ack=False)
    self._parser = resolve_custom_func(parser, default.parse_message)
    self._decoder = resolve_custom_func(decoder, default.decode_message)

publish async #

publish(message, subject, *, correlation_id, headers=None, reply_to='', rpc=False, rpc_timeout=30.0, raise_timeout=False, **kwargs)
Source code in faststream/nats/publisher/producer.py
@override
async def publish(  # type: ignore[override]
    self,
    message: "SendableMessage",
    subject: str,
    *,
    correlation_id: str,
    headers: Optional[Dict[str, str]] = None,
    reply_to: str = "",
    rpc: bool = False,
    rpc_timeout: Optional[float] = 30.0,
    raise_timeout: bool = False,
    **kwargs: Any,  # suprress stream option
) -> Optional[Any]:
    payload, content_type = encode_message(message)

    headers_to_send = {
        "content-type": content_type or "",
        "correlation_id": correlation_id,
        **(headers or {}),
    }

    client = self._connection

    if rpc:
        if reply_to:
            raise WRONG_PUBLISH_ARGS

        reply_to = client.new_inbox()

        future: asyncio.Future[Msg] = asyncio.Future()
        sub = await client.subscribe(reply_to, future=future, max_msgs=1)
        await sub.unsubscribe(limit=1)

    await client.publish(
        subject=subject,
        payload=payload,
        reply=reply_to,
        headers=headers_to_send,
    )

    if rpc:
        msg: Any = None
        with timeout_scope(rpc_timeout, raise_timeout):
            msg = await future

        if msg:  # pragma: no branch
            if msg.headers:  # pragma: no cover # noqa: SIM102
                if (
                    msg.headers.get(nats.js.api.Header.STATUS)
                    == nats.aio.client.NO_RESPONDERS_STATUS
                ):
                    raise nats.errors.NoRespondersError
            return await self._decoder(await self._parser(msg))

    return None

request async #

request(message, subject, *, correlation_id, headers=None, timeout=0.5)
Source code in faststream/nats/publisher/producer.py
@override
async def request(  # type: ignore[override]
    self,
    message: "SendableMessage",
    subject: str,
    *,
    correlation_id: str,
    headers: Optional[Dict[str, str]] = None,
    timeout: float = 0.5,
) -> "Msg":
    payload, content_type = encode_message(message)

    headers_to_send = {
        "content-type": content_type or "",
        "correlation_id": correlation_id,
        **(headers or {}),
    }

    return await self._connection.request(
        subject=subject,
        payload=payload,
        headers=headers_to_send,
        timeout=timeout,
    )