Skip to content

ListSub

faststream.redis.schemas.ListSub #

ListSub(
    channel: str,
    batch: bool = False,
    max_records: PositiveInt = 10,
    polling_interval: PositiveFloat = 0.1,
)

Bases: NameRequired

A class to represent a Redis List subscriber.

Redis List subscriber parameters.

PARAMETER DESCRIPTION
channel

(str): Redis List name.

TYPE: str

batch

(bool): consume messages in batches.

TYPE: bool DEFAULT: False

max_records

(int): max records per batch.

TYPE: PositiveInt DEFAULT: 10

polling_interval

(float): wait message block.

TYPE: PositiveFloat DEFAULT: 0.1

Source code in faststream/redis/schemas.py
def __init__(
    self,
    channel: str,
    batch: bool = False,
    max_records: PositiveInt = 10,
    polling_interval: PositiveFloat = 0.1,
) -> None:
    """Redis List subscriber parameters.

    Args:
        channel: (str): Redis List name.
        batch: (bool): consume messages in batches.
        max_records: (int): max records per batch.
        polling_interval: (float): wait message block.
    """
    super().__init__(
        name=channel,
        batch=batch,
        max_records=max_records,
        polling_interval=polling_interval,
    )

batch class-attribute instance-attribute #

batch: bool = False

max_records class-attribute instance-attribute #

max_records: PositiveInt = 10

name class-attribute instance-attribute #

name: str = Field(...)

polling_interval class-attribute instance-attribute #

polling_interval: PositiveFloat = 0.1

records property #

records: Optional[PositiveInt]

validate classmethod #

validate(
    value: Union[str, NameRequiredCls, None], **kwargs: Any
) -> Optional[NameRequiredCls]

Validates a value.

PARAMETER DESCRIPTION
value

The value to be validated.

TYPE: Union[str, NameRequiredCls, None]

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Optional[NameRequiredCls]

The validated value.

Source code in faststream/broker/schemas.py
@classmethod
def validate(
    cls: Type[NameRequiredCls],
    value: Union[str, NameRequiredCls, None],
    **kwargs: Any,
) -> Optional[NameRequiredCls]:
    """Validates a value.

    Args:
        value: The value to be validated.
        **kwargs: Additional keyword arguments.

    Returns:
        The validated value.

    """
    if value is not None and isinstance(value, str):
        value = cls(value, **kwargs)
    return value