Bases: NameRequired
A class to represent a Redis Stream subscriber.
Redis Stream subscriber parameters.
PARAMETER | DESCRIPTION |
stream | (str): Redis Stream name. TYPE: str |
polling_interval | (int:ms | None): wait message block. TYPE: Optional[PositiveInt] DEFAULT: 100 |
group | (str | None): consumer group name. TYPE: Optional[str] DEFAULT: None |
consumer | (str | None): consumer name. TYPE: Optional[str] DEFAULT: None |
batch | (bool): consume messages in batches. TYPE: bool DEFAULT: False |
no_ack | (bool): do not add message to PEL. TYPE: bool DEFAULT: False |
last_id | (str | None): start reading from this ID. TYPE: Optional[str] DEFAULT: None |
Source code in faststream/redis/schemas.py
| def __init__(
self,
stream: str,
polling_interval: Optional[PositiveInt] = 100,
group: Optional[str] = None,
consumer: Optional[str] = None,
batch: bool = False,
no_ack: bool = False,
last_id: Optional[str] = None,
) -> None:
"""Redis Stream subscriber parameters.
Args:
stream: (str): Redis Stream name.
polling_interval: (int:ms | None): wait message block.
group: (str | None): consumer group name.
consumer: (str | None): consumer name.
batch: (bool): consume messages in batches.
no_ack: (bool): do not add message to PEL.
last_id: (str | None): start reading from this ID.
"""
if (group and not consumer) or (not group and consumer):
raise ValueError("You should specify `group` and `consumer` both")
if group and consumer:
msg: Optional[str] = None
if last_id:
msg = "`last_id` has no effect with consumer group"
if no_ack:
msg = "`no_ack` has no effect with consumer group"
if msg:
warnings.warn(
message=msg,
category=RuntimeWarning,
stacklevel=1,
)
super().__init__(
name=stream,
group=group,
consumer=consumer,
polling_interval=polling_interval,
batch=batch,
no_ack=no_ack,
last_id=last_id or "$",
)
|
batch class-attribute
instance-attribute
consumer class-attribute
instance-attribute
group class-attribute
instance-attribute
last_id class-attribute
instance-attribute
name class-attribute
instance-attribute
no_ack class-attribute
instance-attribute
polling_interval class-attribute
instance-attribute
polling_interval: Optional[PositiveInt] = Field(
default=100, description="ms"
)
validate classmethod
validate(
value: Union[str, NameRequiredCls, None], **kwargs: Any
) -> Optional[NameRequiredCls]
Validates a value.
PARAMETER | DESCRIPTION |
value | The value to be validated. TYPE: Union[str, NameRequiredCls, None] |
**kwargs | Additional keyword arguments. TYPE: Any DEFAULT: {} |
RETURNS | DESCRIPTION |
Optional[NameRequiredCls] | |
Source code in faststream/broker/schemas.py
| @classmethod
def validate(
cls: Type[NameRequiredCls],
value: Union[str, NameRequiredCls, None],
**kwargs: Any,
) -> Optional[NameRequiredCls]:
"""Validates a value.
Args:
value: The value to be validated.
**kwargs: Additional keyword arguments.
Returns:
The validated value.
"""
if value is not None and isinstance(value, str):
value = cls(value, **kwargs)
return value
|