Skip to content

NatsJSFastProducer

faststream.nats.producer.NatsJSFastProducer #

NatsJSFastProducer(
    connection: JetStreamContext,
    parser: Optional[AsyncCustomParser[Msg, NatsMessage]],
    decoder: Optional[AsyncCustomDecoder[NatsMessage]],
)

A class to represent a NATS JetStream producer.

Initialize the NATS JetStream producer.

PARAMETER DESCRIPTION
connection

The NATS JetStream connection.

TYPE: JetStreamContext

parser

The parser.

TYPE: Optional[AsyncCustomParser[Msg, NatsMessage]]

decoder

The decoder.

TYPE: Optional[AsyncCustomDecoder[NatsMessage]]

Source code in faststream/nats/producer.py
def __init__(
    self,
    connection: JetStreamContext,
    parser: Optional[AsyncCustomParser[Msg, NatsMessage]],
    decoder: Optional[AsyncCustomDecoder[NatsMessage]],
) -> None:
    """Initialize the NATS JetStream producer.

    Args:
        connection: The NATS JetStream connection.
        parser: The parser.
        decoder: The decoder.
    """
    self._connection = connection
    self._parser = resolve_custom_func(parser, Parser.parse_message)
    self._decoder = resolve_custom_func(decoder, Parser.decode_message)

publish async #

publish(
    message: SendableMessage,
    subject: str,
    headers: Optional[Dict[str, str]] = None,
    reply_to: str = "",
    correlation_id: Optional[str] = None,
    stream: Optional[str] = None,
    timeout: Optional[float] = None,
    *,
    rpc: bool = False,
    rpc_timeout: Optional[float] = 30.0,
    raise_timeout: bool = False
) -> Optional[Any]
Source code in faststream/nats/producer.py
async def publish(
    self,
    message: SendableMessage,
    subject: str,
    headers: Optional[Dict[str, str]] = None,
    reply_to: str = "",
    correlation_id: Optional[str] = None,
    stream: Optional[str] = None,
    timeout: Optional[float] = None,
    *,
    rpc: bool = False,
    rpc_timeout: Optional[float] = 30.0,
    raise_timeout: bool = False,
) -> Optional[Any]:
    payload, content_type = encode_message(message)

    headers_to_send = {
        "content-type": content_type or "",
        "correlation_id": correlation_id or str(uuid4()),
        **(headers or {}),
    }

    if rpc:
        if reply_to:
            raise WRONG_PUBLISH_ARGS

        reply_to = str(uuid4())
        future: asyncio.Future[Msg] = asyncio.Future()
        sub = await self._connection._nc.subscribe(
            reply_to, future=future, max_msgs=1
        )
        await sub.unsubscribe(limit=1)

    if reply_to:
        headers_to_send.update({"reply_to": reply_to})

    await self._connection.publish(
        subject=subject,
        payload=payload,
        headers=headers_to_send,
        stream=stream,
        timeout=timeout,
    )

    if rpc:
        msg: Any = None
        with timeout_scope(rpc_timeout, raise_timeout):
            msg = await future

        if msg:  # pragma: no branch
            if msg.headers:  # pragma: no cover # noqa: SIM102
                if (
                    msg.headers.get(nats.js.api.Header.STATUS)
                    == nats.aio.client.NO_RESPONDERS_STATUS
                ):
                    raise nats.errors.NoRespondersError
            return await self._decoder(await self._parser(msg))

    return None