Skip to content

NatsFastProducer

faststream.nats.producer.NatsFastProducer #

NatsFastProducer(connection: Client, parser: Optional[AsyncCustomParser[Msg, NatsMessage]], decoder: Optional[AsyncCustomDecoder[NatsMessage]])

A class to represent a NATS producer.

Initialize the NATS producer.

PARAMETER DESCRIPTION
connection

The NATS connection.

TYPE: Client

parser

The parser.

TYPE: Optional[AsyncCustomParser[Msg, NatsMessage]]

decoder

The decoder.

TYPE: Optional[AsyncCustomDecoder[NatsMessage]]

Source code in faststream/nats/producer.py
def __init__(
    self,
    connection: Client,
    parser: Optional[AsyncCustomParser[Msg, NatsMessage]],
    decoder: Optional[AsyncCustomDecoder[NatsMessage]],
) -> None:
    """Initialize the NATS producer.

    Args:
        connection: The NATS connection.
        parser: The parser.
        decoder: The decoder.
    """
    self._connection = connection
    self._parser = resolve_custom_func(parser, Parser.parse_message)
    self._decoder = resolve_custom_func(decoder, Parser.decode_message)

publish async #

publish(message: SendableMessage, subject: str, headers: Optional[Dict[str, str]] = None, reply_to: str = '', correlation_id: Optional[str] = None, *, rpc: bool = False, rpc_timeout: Optional[float] = 30.0, raise_timeout: bool = False) -> Optional[Any]
Source code in faststream/nats/producer.py
async def publish(
    self,
    message: SendableMessage,
    subject: str,
    headers: Optional[Dict[str, str]] = None,
    reply_to: str = "",
    correlation_id: Optional[str] = None,
    *,
    rpc: bool = False,
    rpc_timeout: Optional[float] = 30.0,
    raise_timeout: bool = False,
) -> Optional[Any]:
    payload, content_type = encode_message(message)

    headers_to_send = {
        "content-type": content_type or "",
        "correlation_id": correlation_id or str(uuid4()),
        **(headers or {}),
    }

    client = self._connection

    if rpc:
        if reply_to:
            raise WRONG_PUBLISH_ARGS

        token = client._nuid.next()
        token.extend(token_hex(2).encode())
        reply_to = token.decode()

        future: asyncio.Future[Msg] = asyncio.Future()
        sub = await client.subscribe(reply_to, future=future, max_msgs=1)
        await sub.unsubscribe(limit=1)

    await client.publish(
        subject=subject,
        payload=payload,
        reply=reply_to,
        headers=headers_to_send,
    )

    if rpc:
        msg: Any = None
        with timeout_scope(rpc_timeout, raise_timeout):
            msg = await future

        if msg:  # pragma: no branch
            if msg.headers:  # pragma: no cover # noqa: SIM102
                if (
                    msg.headers.get(nats.js.api.Header.STATUS)
                    == nats.aio.client.NO_RESPONDERS_STATUS
                ):
                    raise nats.errors.NoRespondersError
            return await self._decoder(await self._parser(msg))

    return None