Skip to content

StreamSource

nats.js.api.StreamSource dataclass #

StreamSource(name, opt_start_seq=None, filter_subject=None, external=None, subject_transforms=None)

Bases: Base

name instance-attribute #

name

opt_start_seq class-attribute instance-attribute #

opt_start_seq = None

filter_subject class-attribute instance-attribute #

filter_subject = None

external class-attribute instance-attribute #

external = None

subject_transforms class-attribute instance-attribute #

subject_transforms = None

evolve #

evolve(**params)

Return a copy of the instance with the passed values replaced.

Source code in nats/js/api.py
def evolve(self: _B, **params) -> _B:
    """Return a copy of the instance with the passed values replaced.
    """
    return replace(self, **params)

from_response classmethod #

from_response(resp)
Source code in nats/js/api.py
@classmethod
def from_response(cls, resp: Dict[str, Any]):
    cls._convert(resp, 'external', ExternalStream)
    cls._convert(resp, 'subject_transforms', SubjectTransform)
    return super().from_response(resp)

as_dict #

as_dict()
Source code in nats/js/api.py
def as_dict(self) -> Dict[str, object]:
    result = super().as_dict()
    if self.subject_transforms:
        result['subject_transform'] = [
            tr.as_dict() for tr in self.subject_transforms
        ]
    return result