@override
async def publish( # type: ignore[override]
self,
message: "SendableMessage",
subject: str,
*,
correlation_id: str,
headers: Optional[Dict[str, str]] = None,
reply_to: str = "",
stream: Optional[str] = None,
timeout: Optional[float] = None,
rpc: bool = False,
rpc_timeout: Optional[float] = 30.0,
raise_timeout: bool = False,
) -> Optional[Any]:
payload, content_type = encode_message(message)
headers_to_send = {
"content-type": content_type or "",
"correlation_id": correlation_id,
**(headers or {}),
}
if rpc:
if reply_to:
raise WRONG_PUBLISH_ARGS
reply_to = self._connection._nc.new_inbox()
future: asyncio.Future[Msg] = asyncio.Future()
sub = await self._connection._nc.subscribe(
reply_to, future=future, max_msgs=1
)
await sub.unsubscribe(limit=1)
if reply_to:
headers_to_send.update({"reply_to": reply_to})
await self._connection.publish(
subject=subject,
payload=payload,
headers=headers_to_send,
stream=stream,
timeout=timeout,
)
if rpc:
msg: Any = None
with timeout_scope(rpc_timeout, raise_timeout):
msg = await future
if msg: # pragma: no branch
if msg.headers: # pragma: no cover # noqa: SIM102
if (
msg.headers.get(nats.js.api.Header.STATUS)
== nats.aio.client.NO_RESPONDERS_STATUS
):
raise nats.errors.NoRespondersError
return await self._decoder(await self._parser(msg))
return None