Skip to content

PatchedMessage

faststream.nats.testing.PatchedMessage #

Bases: Msg

ack async #

ack()
Source code in faststream/nats/testing.py
async def ack(self) -> None:
    pass

ack_sync async #

ack_sync(timeout=1)
Source code in faststream/nats/testing.py
async def ack_sync(
    self,
    timeout: float = 1,
) -> "PatchedMessage":  # pragma: no cover
    return self

nak async #

nak(delay=None)
Source code in faststream/nats/testing.py
async def nak(self, delay: Union[int, float, None] = None) -> None:
    pass

term async #

term()
Source code in faststream/nats/testing.py
async def term(self) -> None:
    pass

in_progress async #

in_progress()
Source code in faststream/nats/testing.py
async def in_progress(self) -> None:
    pass