Skip to content

KvWatch

faststream.nats.KvWatch #

KvWatch(bucket, headers_only=False, include_history=False, ignore_deletes=False, meta_only=False, inactive_threshold=None, timeout=5.0, declare=True)

Bases: NameRequired

A class to represent a NATS kv watch subscription.

PARAMETER DESCRIPTION
bucket

Bucket name.

TYPE: str

headers_only

Whether to receive only headers (default is False).

TYPE: bool DEFAULT: False

include_history

Whether to include history (default is False).

TYPE: bool DEFAULT: False

ignore_deletes

Whether to ignore deletes (default is False).

TYPE: bool DEFAULT: False

meta_only

Whether to receive only metadata (default is False).

TYPE: bool DEFAULT: False

inactive_threshold

obj:float, optional): Inactivity threshold (default is None).

DEFAULT: None

timeout

obj:float, optional): Timeout in seconds (default is 5.0).

DEFAULT: 5.0

declare

Whether to create bucket automatically or just connect to it (default is True).

TYPE: bool DEFAULT: True

Source code in faststream/nats/schemas/kv_watch.py
def __init__(
    self,
    bucket: str,
    headers_only: bool = False,
    include_history: bool = False,
    ignore_deletes: bool = False,
    meta_only: bool = False,
    inactive_threshold: Optional[float] = None,
    timeout: Optional[float] = 5.0,
    # custom
    declare: bool = True,
) -> None:
    super().__init__(bucket)

    self.headers_only = headers_only
    self.include_history = include_history
    self.ignore_deletes = ignore_deletes
    self.meta_only = meta_only
    self.inactive_threshold = inactive_threshold
    self.timeout = timeout

    self.declare = declare

name instance-attribute #

name = name

headers_only instance-attribute #

headers_only = headers_only

include_history instance-attribute #

include_history = include_history

ignore_deletes instance-attribute #

ignore_deletes = ignore_deletes

meta_only instance-attribute #

meta_only = meta_only

inactive_threshold instance-attribute #

inactive_threshold = inactive_threshold

timeout instance-attribute #

timeout = timeout

declare instance-attribute #

declare = declare

validate classmethod #

validate(value, **kwargs)

Factory to create object.

Source code in faststream/broker/schemas.py
@classmethod
def validate(
    cls: Type[NameRequiredCls],
    value: Union[str, NameRequiredCls, None],
    **kwargs: Any,
) -> Optional[NameRequiredCls]:
    """Factory to create object."""
    if value is not None and isinstance(value, str):
        value = cls(value, **kwargs)
    return value