Skip to content

RePublish

nats.js.api.RePublish dataclass #

RePublish(src=None, dest=None, headers_only=None)

Bases: Base

RePublish is for republishing messages once committed to a stream. The original subject cis remapped from the subject pattern to the destination pattern.

src class-attribute instance-attribute #

src = None

dest class-attribute instance-attribute #

dest = None

headers_only class-attribute instance-attribute #

headers_only = None

from_response classmethod #

from_response(resp)

Read the class instance from a server response.

Unknown fields are ignored ("open-world assumption").

Source code in nats/js/api.py
@classmethod
def from_response(cls: type[_B], resp: Dict[str, Any]) -> _B:
    """Read the class instance from a server response.

    Unknown fields are ignored ("open-world assumption").
    """
    params = {}
    for field in fields(cls):
        if field.name in resp:
            params[field.name] = resp[field.name]
    return cls(**params)

evolve #

evolve(**params)

Return a copy of the instance with the passed values replaced.

Source code in nats/js/api.py
def evolve(self: _B, **params) -> _B:
    """Return a copy of the instance with the passed values replaced.
    """
    return replace(self, **params)

as_dict #

as_dict()

Return the object converted into an API-friendly dict.

Source code in nats/js/api.py
def as_dict(self) -> Dict[str, object]:
    """Return the object converted into an API-friendly dict.
    """
    result = {}
    for field in fields(self):
        val = getattr(self, field.name)
        if val is None:
            continue
        if isinstance(val, Base):
            val = val.as_dict()
        if isinstance(val, list):
            if len(val) > 0 and isinstance(val[0], Base):
                val = [v.as_dict() for v in val if isinstance(v, Base)]
        result[field.name] = val
    return result