Skip to content

StreamBuilder

faststream.nats.helpers.StreamBuilder #

StreamBuilder()

A class to build streams.

Initialize the stream builder.

Source code in faststream/nats/helpers.py
def __init__(self) -> None:
    """Initialize the stream builder."""
    self.streams = {}

streams instance-attribute #

streams: Dict[str, JStream] = {}

stream #

stream(
    name: Union[str, JStream, None],
    *args: Any,
    declare: bool = True,
    **kwargs: Any
) -> Optional[JStream]

Get a stream.

PARAMETER DESCRIPTION
*args

The arguments.

TYPE: Any DEFAULT: ()

name

The stream name.

TYPE: Union[str, JStream, None]

declare

Whether to declare the stream.

TYPE: bool DEFAULT: True

**kwargs

The keyword arguments.

TYPE: Any DEFAULT: {}

Source code in faststream/nats/helpers.py
def stream(
    self,
    name: Union[str, JStream, None],
    *args: Any,
    declare: bool = True,
    **kwargs: Any,
) -> Optional[JStream]:
    """Get a stream.

    Args:
        *args: The arguments.
        name: The stream name.
        declare: Whether to declare the stream.
        **kwargs: The keyword arguments.
    """
    stream = JStream.validate(name)

    if stream is not None:
        stream = self.streams[stream.name] = self.streams.get(stream.name, stream)

    return stream