Skip to content

StreamSource

nats.js.api.StreamSource dataclass #

Bases: Base

external class-attribute instance-attribute #

external: Optional[ExternalStream] = None

filter_subject class-attribute instance-attribute #

filter_subject: Optional[str] = None

name instance-attribute #

name: str

opt_start_seq class-attribute instance-attribute #

opt_start_seq: Optional[int] = None

as_dict #

as_dict() -> Dict[str, object]

Return the object converted into an API-friendly dict.

Source code in nats/js/api.py
def as_dict(self) -> Dict[str, object]:
    """Return the object converted into an API-friendly dict.
    """
    result = {}
    for field in fields(self):
        val = getattr(self, field.name)
        if val is None:
            continue
        if isinstance(val, Base):
            val = val.as_dict()
        result[field.name] = val
    return result

evolve #

evolve(**params) -> _B

Return a copy of the instance with the passed values replaced.

Source code in nats/js/api.py
def evolve(self: _B, **params) -> _B:
    """Return a copy of the instance with the passed values replaced.
    """
    return replace(self, **params)

from_response classmethod #

from_response(resp: Dict[str, Any])
Source code in nats/js/api.py
@classmethod
def from_response(cls, resp: Dict[str, Any]):
    cls._convert(resp, 'external', ExternalStream)
    return super().from_response(resp)