Skip to content

StreamSub

faststream.redis.StreamSub #

StreamSub(stream: str, polling_interval: Optional[PositiveInt] = 100, group: Optional[str] = None, consumer: Optional[str] = None, batch: bool = False, no_ack: bool = False, last_id: Optional[str] = None, maxlen: Optional[PositiveInt] = None, max_records: Optional[PositiveInt] = None)

Bases: NameRequired

A class to represent a Redis Stream subscriber.

Redis Stream subscriber parameters.

PARAMETER DESCRIPTION
stream

(str): Redis Stream name.

TYPE: str

polling_interval

(int:ms | None): wait message block.

TYPE: Optional[PositiveInt] DEFAULT: 100

group

(str | None): consumer group name.

TYPE: Optional[str] DEFAULT: None

consumer

(str | None): consumer name.

TYPE: Optional[str] DEFAULT: None

batch

(bool): consume messages in batches.

TYPE: bool DEFAULT: False

max_records

(int | None): consuming batch size.

TYPE: Optional[PositiveInt] DEFAULT: None

no_ack

(bool): do not add message to PEL.

TYPE: bool DEFAULT: False

last_id

(str | None): start reading from this ID.

TYPE: Optional[str] DEFAULT: None

maxlen

(int | None): truncate old stream members beyond this size.

TYPE: Optional[PositiveInt] DEFAULT: None

Source code in faststream/redis/schemas.py
def __init__(
    self,
    stream: str,
    polling_interval: Optional[PositiveInt] = 100,
    group: Optional[str] = None,
    consumer: Optional[str] = None,
    batch: bool = False,
    no_ack: bool = False,
    last_id: Optional[str] = None,
    maxlen: Optional[PositiveInt] = None,
    max_records: Optional[PositiveInt] = None,
) -> None:
    """Redis Stream subscriber parameters.

    Args:
        stream: (str): Redis Stream name.
        polling_interval: (int:ms | None): wait message block.
        group: (str | None): consumer group name.
        consumer: (str | None): consumer name.
        batch: (bool): consume messages in batches.
        max_records: (int | None): consuming batch size.
        no_ack: (bool): do not add message to PEL.
        last_id: (str | None): start reading from this ID.
        maxlen: (int | None): truncate old stream members beyond this size.
    """
    if (group and not consumer) or (not group and consumer):
        raise ValueError("You should specify `group` and `consumer` both")

    if group and consumer and no_ack:
        warnings.warn(
            message="`no_ack` has no effect with consumer group",
            category=RuntimeWarning,
            stacklevel=1,
        )

    super().__init__(
        name=stream,
        group=group,
        consumer=consumer,
        polling_interval=polling_interval,
        batch=batch,
        no_ack=no_ack,
        last_id=last_id or "$",
        maxlen=maxlen,
        max_records=max_records,
    )

batch class-attribute instance-attribute #

batch: bool = False

consumer class-attribute instance-attribute #

consumer: Optional[str] = None

group class-attribute instance-attribute #

group: Optional[str] = None

last_id class-attribute instance-attribute #

last_id: str = '$'

max_records class-attribute instance-attribute #

max_records: Optional[PositiveInt] = None

maxlen class-attribute instance-attribute #

maxlen: Optional[PositiveInt] = None

name class-attribute instance-attribute #

name: str = Field(...)

no_ack class-attribute instance-attribute #

no_ack: bool = False

polling_interval class-attribute instance-attribute #

polling_interval: Optional[PositiveInt] = Field(default=100, description='ms')

validate classmethod #

validate(value: Union[str, NameRequiredCls, None], **kwargs: Any) -> Optional[NameRequiredCls]

Validates a value.

PARAMETER DESCRIPTION
value

The value to be validated.

TYPE: Union[str, NameRequiredCls, None]

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Optional[NameRequiredCls]

The validated value.

Source code in faststream/broker/schemas.py
@classmethod
def validate(
    cls: Type[NameRequiredCls],
    value: Union[str, NameRequiredCls, None],
    **kwargs: Any,
) -> Optional[NameRequiredCls]:
    """Validates a value.

    Args:
        value: The value to be validated.
        **kwargs: Additional keyword arguments.

    Returns:
        The validated value.

    """
    if value is not None and isinstance(value, str):
        value = cls(value, **kwargs)
    return value