Bases: NameRequired
A class to represent a JetStream stream.
Initialize the JetStream stream.
PARAMETER | DESCRIPTION |
name | TYPE: str |
*args | TYPE: Any DEFAULT: () |
declare | Whether to declare the stream. TYPE: bool DEFAULT: True |
**kwargs | TYPE: Any DEFAULT: {} |
Source code in faststream/nats/js_stream.py
| def __init__(
self,
name: str,
*args: Any,
declare: bool = True,
**kwargs: Any,
) -> None:
"""Initialize the JetStream stream.
Args:
name: The stream name.
*args: The arguments.
declare: Whether to declare the stream.
**kwargs: The keyword arguments.
"""
super().__init__(
name=name,
declare=declare,
subjects=[],
config=StreamConfig(
*args,
name=name,
**kwargs, # type: ignore[misc]
),
)
|
config instance-attribute
declare class-attribute
instance-attribute
declare: bool = Field(default=True)
name class-attribute
instance-attribute
subjects class-attribute
instance-attribute
validate classmethod
validate(
value: Union[str, NameRequiredCls, None], **kwargs: Any
) -> Optional[NameRequiredCls]
Validates a value.
PARAMETER | DESCRIPTION |
value | The value to be validated. TYPE: Union[str, NameRequiredCls, None] |
**kwargs | Additional keyword arguments. TYPE: Any DEFAULT: {} |
RETURNS | DESCRIPTION |
Optional[NameRequiredCls] | |
Source code in faststream/broker/schemas.py
| @classmethod
def validate(
cls: Type[NameRequiredCls],
value: Union[str, NameRequiredCls, None],
**kwargs: Any,
) -> Optional[NameRequiredCls]:
"""Validates a value.
Args:
value: The value to be validated.
**kwargs: Additional keyword arguments.
Returns:
The validated value.
"""
if value is not None and isinstance(value, str):
value = cls(value, **kwargs)
return value
|