Bases: NameRequired
A class to represent a Redis PubSub channel.
Redis PubSub channel parameters.
PARAMETER | DESCRIPTION |
channel | (str): Redis PubSub channel name. TYPE: str |
pattern | (bool): use pattern matching. TYPE: bool DEFAULT: False |
polling_interval | (float): wait message block. TYPE: PositiveFloat DEFAULT: 1.0 |
Source code in faststream/redis/schemas.py
| def __init__(
self,
channel: str,
pattern: bool = False,
polling_interval: PositiveFloat = 1.0,
) -> None:
"""Redis PubSub channel parameters.
Args:
channel: (str): Redis PubSub channel name.
pattern: (bool): use pattern matching.
polling_interval: (float): wait message block.
"""
reg, path = compile_path(
channel,
replace_symbol="*",
patch_regex=lambda x: x.replace(r"\*", ".*"),
)
if reg is not None:
pattern = True
super().__init__(
name=path,
path_regex=reg,
pattern=pattern,
polling_interval=polling_interval,
)
|
last_id class-attribute
instance-attribute
model_config class-attribute
instance-attribute
model_config = {'arbitrary_types_allowed': True}
name class-attribute
instance-attribute
path_regex class-attribute
instance-attribute
pattern class-attribute
instance-attribute
polling_interval class-attribute
instance-attribute
polling_interval: PositiveFloat = 1.0
Config
arbitrary_types_allowed class-attribute
instance-attribute
arbitrary_types_allowed = True
validate classmethod
validate(
value: Union[str, NameRequiredCls, None], **kwargs: Any
) -> Optional[NameRequiredCls]
Validates a value.
PARAMETER | DESCRIPTION |
value | The value to be validated. TYPE: Union[str, NameRequiredCls, None] |
**kwargs | Additional keyword arguments. TYPE: Any DEFAULT: {} |
RETURNS | DESCRIPTION |
Optional[NameRequiredCls] | |
Source code in faststream/broker/schemas.py
| @classmethod
def validate(
cls: Type[NameRequiredCls],
value: Union[str, NameRequiredCls, None],
**kwargs: Any,
) -> Optional[NameRequiredCls]:
"""Validates a value.
Args:
value: The value to be validated.
**kwargs: Additional keyword arguments.
Returns:
The validated value.
"""
if value is not None and isinstance(value, str):
value = cls(value, **kwargs)
return value
|