KvWatch(
bucket,
headers_only=False,
include_history=False,
ignore_deletes=False,
meta_only=False,
inactive_threshold=None,
timeout=5.0,
declare=True,
)
Bases: NameRequired
A class to represent a NATS kv watch subscription.
PARAMETER | DESCRIPTION |
bucket | TYPE: str |
headers_only | Whether to receive only headers (default is False ). TYPE: bool DEFAULT: False |
include_history | Whether to include history (default is False ). TYPE: bool DEFAULT: False |
ignore_deletes | Whether to ignore deletes (default is False ). TYPE: bool DEFAULT: False |
meta_only | Whether to receive only metadata (default is False ). TYPE: bool DEFAULT: False |
inactive_threshold | obj:float , optional): Inactivity threshold (default is None ). DEFAULT: None |
timeout | obj:float , optional): Timeout in seconds (default is 5.0 ). DEFAULT: 5.0 |
declare | Whether to create bucket automatically or just connect to it (default is True ). TYPE: bool DEFAULT: True |
Source code in faststream/nats/schemas/kv_watch.py
| def __init__(
self,
bucket: str,
headers_only: bool = False,
include_history: bool = False,
ignore_deletes: bool = False,
meta_only: bool = False,
inactive_threshold: Optional[float] = None,
timeout: Optional[float] = 5.0,
# custom
declare: bool = True,
) -> None:
super().__init__(bucket)
self.headers_only = headers_only
self.include_history = include_history
self.ignore_deletes = ignore_deletes
self.meta_only = meta_only
self.inactive_threshold = inactive_threshold
self.timeout = timeout
self.declare = declare
|
headers_only = headers_only
include_history instance-attribute
include_history = include_history
ignore_deletes instance-attribute
ignore_deletes = ignore_deletes
inactive_threshold instance-attribute
inactive_threshold = inactive_threshold
timeout instance-attribute
declare instance-attribute
validate classmethod
validate(
value: Union[str, NameRequiredCls], **kwargs: Any
) -> NameRequiredCls
validate(value: None, **kwargs: Any) -> None
validate(value, **kwargs)
Factory to create object.
Source code in faststream/broker/schemas.py
| @classmethod
def validate(
cls: Type[NameRequiredCls],
value: Union[str, NameRequiredCls, None],
**kwargs: Any,
) -> Optional[NameRequiredCls]:
"""Factory to create object."""
if value is not None and isinstance(value, str):
value = cls(value, **kwargs)
return value
|