@override
async def publish( # type: ignore[override]
self,
message: "SendableMessage",
subject: str,
reply_to: str = "",
headers: Optional[Dict[str, str]] = None,
correlation_id: Optional[str] = None,
# NatsJSFastProducer compatibility
timeout: Optional[float] = None,
stream: Optional[str] = None,
*,
rpc: bool = False,
rpc_timeout: Optional[float] = None,
raise_timeout: bool = False,
) -> Any:
if rpc and reply_to:
raise WRONG_PUBLISH_ARGS
incoming = build_message(
message=message,
subject=subject,
headers=headers,
correlation_id=correlation_id,
reply_to=reply_to,
)
for handler in self.broker._subscribers.values(): # pragma: no branch
if _is_handler_suitable(handler, subject, stream):
msg: Union[List[PatchedMessage], PatchedMessage]
if (pull := getattr(handler, "pull_sub", None)) and pull.batch:
msg = [incoming]
else:
msg = incoming
with timeout_scope(rpc_timeout, raise_timeout):
response = await self._execute_handler(msg, subject, handler)
if rpc:
return await self._decoder(await self._parser(response))
return None