@override
async def publish( # type: ignore[override]
self,
message: "SendableMessage",
*,
correlation_id: str,
channel: Optional[str] = None,
list: Optional[str] = None,
stream: Optional[str] = None,
maxlen: Optional[int] = None,
headers: Optional["AnyDict"] = None,
reply_to: str = "",
rpc: bool = False,
rpc_timeout: Optional[float] = 30.0,
raise_timeout: bool = False,
) -> Optional[Any]:
if not any((channel, list, stream)):
raise SetupError(INCORRECT_SETUP_MSG)
psub: Optional[PubSub] = None
if rpc:
if reply_to:
raise WRONG_PUBLISH_ARGS
nuid = NUID()
rpc_nuid = str(nuid.next(), "utf-8")
reply_to = rpc_nuid
psub = self._connection.pubsub()
await psub.subscribe(reply_to)
msg = RawMessage.encode(
message=message,
reply_to=reply_to,
headers=headers,
correlation_id=correlation_id,
)
if channel is not None:
await self._connection.publish(channel, msg)
elif list is not None:
await self._connection.rpush(list, msg)
elif stream is not None:
await self._connection.xadd(
name=stream,
fields={DATA_KEY: msg},
maxlen=maxlen,
)
else:
raise AssertionError("unreachable")
if psub is None:
return None
else:
m = None
with timeout_scope(rpc_timeout, raise_timeout):
# skip subscribe message
await psub.get_message(
ignore_subscribe_messages=True,
timeout=rpc_timeout or 0.0,
)
# get real response
m = await psub.get_message(
ignore_subscribe_messages=True,
timeout=rpc_timeout or 0.0,
)
await psub.unsubscribe()
await psub.aclose() # type: ignore[attr-defined]
if m is None:
if raise_timeout:
raise TimeoutError()
else:
return None
else:
return await self._decoder(await self._parser(m))