Skip to content

StreamBuilder

faststream.nats.helpers.object_builder.StreamBuilder #

StreamBuilder()

A class to build streams.

Initialize the builder.

Source code in faststream/nats/helpers/object_builder.py
def __init__(self) -> None:
    """Initialize the builder."""
    self.objects = {}

objects instance-attribute #

objects = {}

create #

create(name)

Get an object.

Source code in faststream/nats/helpers/object_builder.py
def create(
    self,
    name: Union[str, "JStream", None],
) -> Optional["JStream"]:
    """Get an object."""
    stream = JStream.validate(name)

    if stream is not None:
        stream = self.objects[stream.name] = self.objects.get(stream.name, stream)

    return stream