Skip to content

StreamBuilder

faststream.nats.helpers.StreamBuilder #

StreamBuilder()
Source code in faststream/nats/helpers.py
def __init__(self) -> None:
    self.streams = {}

streams instance-attribute #

streams: Dict[str, JStream] = {}

stream #

stream(
    name: Union[str, JStream, None],
    *args: Any,
    declare: bool = True,
    **kwargs: Any
) -> Optional[JStream]
Source code in faststream/nats/helpers.py
def stream(
    self,
    name: Union[str, JStream, None],
    *args: Any,
    declare: bool = True,
    **kwargs: Any,
) -> Optional[JStream]:
    stream = JStream.validate(name)

    if stream is not None:
        stream = self.streams[stream.name] = self.streams.get(stream.name, stream)

    return stream

Last update: 2023-11-13