Skip to content

ConsumerConfig

nats.js.api.ConsumerConfig dataclass #

ConsumerConfig(name: Optional[str] = None, durable_name: Optional[str] = None, description: Optional[str] = None, deliver_policy: Optional[DeliverPolicy] = DeliverPolicy.ALL, opt_start_seq: Optional[int] = None, opt_start_time: Optional[int] = None, ack_policy: Optional[AckPolicy] = AckPolicy.EXPLICIT, ack_wait: Optional[float] = None, max_deliver: Optional[int] = None, filter_subject: Optional[str] = None, filter_subjects: Optional[List[str]] = None, replay_policy: Optional[ReplayPolicy] = ReplayPolicy.INSTANT, rate_limit_bps: Optional[int] = None, sample_freq: Optional[str] = None, max_waiting: Optional[int] = None, max_ack_pending: Optional[int] = None, flow_control: Optional[bool] = None, idle_heartbeat: Optional[float] = None, headers_only: Optional[bool] = None, deliver_subject: Optional[str] = None, deliver_group: Optional[str] = None, inactive_threshold: Optional[float] = None, num_replicas: Optional[int] = None, mem_storage: Optional[bool] = None)

Bases: Base

Consumer configuration.

References
  • Consumers <https://docs.nats.io/jetstream/concepts/consumers>_

ack_policy class-attribute instance-attribute #

ack_policy: Optional[AckPolicy] = EXPLICIT

ack_wait class-attribute instance-attribute #

ack_wait: Optional[float] = None

deliver_group class-attribute instance-attribute #

deliver_group: Optional[str] = None

deliver_policy class-attribute instance-attribute #

deliver_policy: Optional[DeliverPolicy] = ALL

deliver_subject class-attribute instance-attribute #

deliver_subject: Optional[str] = None

description class-attribute instance-attribute #

description: Optional[str] = None

durable_name class-attribute instance-attribute #

durable_name: Optional[str] = None

filter_subject class-attribute instance-attribute #

filter_subject: Optional[str] = None

filter_subjects class-attribute instance-attribute #

filter_subjects: Optional[List[str]] = None

flow_control class-attribute instance-attribute #

flow_control: Optional[bool] = None

headers_only class-attribute instance-attribute #

headers_only: Optional[bool] = None

idle_heartbeat class-attribute instance-attribute #

idle_heartbeat: Optional[float] = None

inactive_threshold class-attribute instance-attribute #

inactive_threshold: Optional[float] = None

max_ack_pending class-attribute instance-attribute #

max_ack_pending: Optional[int] = None

max_deliver class-attribute instance-attribute #

max_deliver: Optional[int] = None

max_waiting class-attribute instance-attribute #

max_waiting: Optional[int] = None

mem_storage class-attribute instance-attribute #

mem_storage: Optional[bool] = None

name class-attribute instance-attribute #

name: Optional[str] = None

num_replicas class-attribute instance-attribute #

num_replicas: Optional[int] = None

opt_start_seq class-attribute instance-attribute #

opt_start_seq: Optional[int] = None

opt_start_time class-attribute instance-attribute #

opt_start_time: Optional[int] = None

rate_limit_bps class-attribute instance-attribute #

rate_limit_bps: Optional[int] = None

replay_policy class-attribute instance-attribute #

replay_policy: Optional[ReplayPolicy] = INSTANT

sample_freq class-attribute instance-attribute #

sample_freq: Optional[str] = None

as_dict #

as_dict() -> Dict[str, object]
Source code in nats/js/api.py
def as_dict(self) -> Dict[str, object]:
    result = super().as_dict()
    result['ack_wait'] = self._to_nanoseconds(self.ack_wait)
    result['idle_heartbeat'] = self._to_nanoseconds(self.idle_heartbeat)
    result['inactive_threshold'] = self._to_nanoseconds(
        self.inactive_threshold
    )
    return result

evolve #

evolve(**params) -> _B

Return a copy of the instance with the passed values replaced.

Source code in nats/js/api.py
def evolve(self: _B, **params) -> _B:
    """Return a copy of the instance with the passed values replaced.
    """
    return replace(self, **params)

from_response classmethod #

from_response(resp: Dict[str, Any])
Source code in nats/js/api.py
@classmethod
def from_response(cls, resp: Dict[str, Any]):
    cls._convert_nanoseconds(resp, 'ack_wait')
    cls._convert_nanoseconds(resp, 'idle_heartbeat')
    cls._convert_nanoseconds(resp, 'inactive_threshold')
    return super().from_response(resp)