Skip to content

StreamConfig

nats.js.api.StreamConfig dataclass #

StreamConfig(
    name=None,
    description=None,
    subjects=None,
    retention=None,
    max_consumers=None,
    max_msgs=None,
    max_bytes=None,
    discard=OLD,
    max_age=None,
    max_msgs_per_subject=-1,
    max_msg_size=-1,
    storage=None,
    num_replicas=None,
    no_ack=False,
    template_owner=None,
    duplicate_window=0,
    placement=None,
    mirror=None,
    sources=None,
    sealed=False,
    deny_delete=False,
    deny_purge=False,
    allow_rollup_hdrs=False,
    republish=None,
    subject_transform=None,
    allow_direct=None,
    mirror_direct=None,
    compression=None,
    metadata=None,
)

Bases: Base

StreamConfig represents the configuration of a stream.

name class-attribute instance-attribute #

name = None

description class-attribute instance-attribute #

description = None

subjects class-attribute instance-attribute #

subjects = None

retention class-attribute instance-attribute #

retention = None

max_consumers class-attribute instance-attribute #

max_consumers = None

max_msgs class-attribute instance-attribute #

max_msgs = None

max_bytes class-attribute instance-attribute #

max_bytes = None

discard class-attribute instance-attribute #

discard = OLD

max_age class-attribute instance-attribute #

max_age = None

max_msgs_per_subject class-attribute instance-attribute #

max_msgs_per_subject = -1

max_msg_size class-attribute instance-attribute #

max_msg_size = -1

storage class-attribute instance-attribute #

storage = None

num_replicas class-attribute instance-attribute #

num_replicas = None

no_ack class-attribute instance-attribute #

no_ack = False

template_owner class-attribute instance-attribute #

template_owner = None

duplicate_window class-attribute instance-attribute #

duplicate_window = 0

placement class-attribute instance-attribute #

placement = None

mirror class-attribute instance-attribute #

mirror = None

sources class-attribute instance-attribute #

sources = None

sealed class-attribute instance-attribute #

sealed = False

deny_delete class-attribute instance-attribute #

deny_delete = False

deny_purge class-attribute instance-attribute #

deny_purge = False

allow_rollup_hdrs class-attribute instance-attribute #

allow_rollup_hdrs = False

republish class-attribute instance-attribute #

republish = None

subject_transform class-attribute instance-attribute #

subject_transform = None

allow_direct class-attribute instance-attribute #

allow_direct = None

mirror_direct class-attribute instance-attribute #

mirror_direct = None

compression class-attribute instance-attribute #

compression = None

metadata class-attribute instance-attribute #

metadata = None

evolve #

evolve(**params)

Return a copy of the instance with the passed values replaced.

Source code in nats/js/api.py
def evolve(self: _B, **params) -> _B:
    """Return a copy of the instance with the passed values replaced.
    """
    return replace(self, **params)

from_response classmethod #

from_response(resp)
Source code in nats/js/api.py
@classmethod
def from_response(cls, resp: Dict[str, Any]):
    cls._convert_nanoseconds(resp, 'max_age')
    cls._convert_nanoseconds(resp, 'duplicate_window')
    cls._convert(resp, 'placement', Placement)
    cls._convert(resp, 'mirror', StreamSource)
    cls._convert(resp, 'sources', StreamSource)
    cls._convert(resp, 'republish', RePublish)
    cls._convert(resp, 'subject_transform', SubjectTransform)
    return super().from_response(resp)

as_dict #

as_dict()
Source code in nats/js/api.py
def as_dict(self) -> Dict[str, object]:
    result = super().as_dict()
    result['duplicate_window'] = self._to_nanoseconds(
        self.duplicate_window
    )
    result['max_age'] = self._to_nanoseconds(self.max_age)
    if self.sources:
        result['sources'] = [src.as_dict() for src in self.sources]
    if self.compression and (self.compression != StoreCompression.NONE and self.compression != StoreCompression.S2):
        raise ValueError(
            "nats: invalid store compression type: %s" % self.compression
        )
    if self.metadata and not isinstance(self.metadata, dict):
        raise ValueError("nats: invalid metadata format")
    return result