Skip to content

PubSub

faststream.redis.PubSub #

PubSub(channel, pattern=False, polling_interval=1.0)

Bases: NameRequired

A class to represent a Redis PubSub channel.

Source code in faststream/redis/schemas/pub_sub.py
def __init__(
    self,
    channel: str,
    pattern: bool = False,
    polling_interval: float = 1.0,
) -> None:
    reg, path = compile_path(
        channel,
        replace_symbol="*",
        patch_regex=lambda x: x.replace(r"\*", ".*"),
    )

    if reg is not None:
        pattern = True

    super().__init__(path)

    self.path_regex = reg
    self.pattern = channel if pattern else None
    self.polling_interval = polling_interval

name instance-attribute #

name = name

path_regex instance-attribute #

path_regex = reg

pattern instance-attribute #

pattern = channel if pattern else None

polling_interval instance-attribute #

polling_interval = polling_interval

validate classmethod #

validate(value: Union[str, NameRequiredCls], **kwargs: Any) -> NameRequiredCls
validate(value: None, **kwargs: Any) -> None
validate(value, **kwargs)

Factory to create object.

Source code in faststream/broker/schemas.py
@classmethod
def validate(
    cls: Type[NameRequiredCls],
    value: Union[str, NameRequiredCls, None],
    **kwargs: Any,
) -> Optional[NameRequiredCls]:
    """Factory to create object."""
    if value is not None and isinstance(value, str):
        value = cls(value, **kwargs)
    return value