StreamSub(
stream,
polling_interval=100,
group=None,
consumer=None,
batch=False,
no_ack=False,
last_id=None,
maxlen=None,
max_records=None,
)
Bases: NameRequired
A class to represent a Redis Stream subscriber.
Source code in faststream/redis/schemas/stream_sub.py
| def __init__(
self,
stream: str,
polling_interval: Optional[int] = 100,
group: Optional[str] = None,
consumer: Optional[str] = None,
batch: bool = False,
no_ack: bool = False,
last_id: Optional[str] = None,
maxlen: Optional[int] = None,
max_records: Optional[int] = None,
) -> None:
if (group and not consumer) or (not group and consumer):
raise SetupError("You should specify `group` and `consumer` both")
if group and consumer and no_ack:
warnings.warn(
message="`no_ack` has no effect with consumer group",
category=RuntimeWarning,
stacklevel=1,
)
if last_id is None:
last_id = "$"
super().__init__(stream)
self.group = group
self.consumer = consumer
self.polling_interval = polling_interval
self.batch = batch
self.no_ack = no_ack
self.last_id = last_id
self.maxlen = maxlen
self.max_records = max_records
|
consumer instance-attribute
polling_interval instance-attribute
polling_interval = polling_interval
no_ack instance-attribute
last_id instance-attribute
maxlen instance-attribute
max_records instance-attribute
max_records = max_records
validate classmethod
validate(
value: Union[str, NameRequiredCls], **kwargs: Any
) -> NameRequiredCls
validate(value: None, **kwargs: Any) -> None
validate(value, **kwargs)
Factory to create object.
Source code in faststream/broker/schemas.py
| @classmethod
def validate(
cls: Type[NameRequiredCls],
value: Union[str, NameRequiredCls, None],
**kwargs: Any,
) -> Optional[NameRequiredCls]:
"""Factory to create object."""
if value is not None and isinstance(value, str):
value = cls(value, **kwargs)
return value
|