Skip to content

ObjWatch

faststream.nats.schemas.ObjWatch #

ObjWatch(ignore_deletes=False, include_history=False, meta_only=False, timeout=5.0, declare=True)

A class to represent a NATS object storage watch subscription.

PARAMETER DESCRIPTION
ignore_deletes

Ignore delete events (default is False).

TYPE: bool DEFAULT: False

include_history

Include history (default is False).

TYPE: bool DEFAULT: False

meta_only

Only metadata. (default is False).

TYPE: bool DEFAULT: False

timeout

The timeout for the watch in seconds (default is 5.0).

TYPE: float DEFAULT: 5.0

declare

Whether to create object storage automatically or just connect to it (default is True).

TYPE: bool DEFAULT: True

Source code in faststream/nats/schemas/obj_watch.py
def __init__(
    self,
    ignore_deletes: bool = False,
    include_history: bool = False,
    meta_only: bool = False,
    timeout: float = 5.0,
    # custom
    declare: bool = True,
) -> None:
    self.ignore_deletes = ignore_deletes
    self.include_history = include_history
    self.meta_only = meta_only
    self.timeout = timeout

    self.declare = declare

ignore_deletes instance-attribute #

ignore_deletes = ignore_deletes

include_history instance-attribute #

include_history = include_history

meta_only instance-attribute #

meta_only = meta_only

timeout instance-attribute #

timeout = timeout

declare instance-attribute #

declare = declare

validate classmethod #

validate(value: Literal[True]) -> ObjWatch
validate(value: Literal[False]) -> None
validate(value: ObjWatch) -> ObjWatch
validate(value: Union[bool, ObjWatch]) -> Optional[ObjWatch]
validate(value)
Source code in faststream/nats/schemas/obj_watch.py
@classmethod
def validate(cls, value: Union[bool, "ObjWatch"]) -> Optional["ObjWatch"]:
    if value is True:
        return ObjWatch()
    elif value is False:
        return None
    else:
        return value