Skip to content

KVBucketDeclarer

faststream.nats.helpers.bucket_declarer.KVBucketDeclarer #

KVBucketDeclarer(connection)
Source code in faststream/nats/helpers/bucket_declarer.py
def __init__(self, connection: "JetStreamContext") -> None:
    self._connection = connection
    self.buckets = {}

buckets instance-attribute #

buckets = {}

create_key_value async #

create_key_value(bucket, *, description=None, max_value_size=None, history=1, ttl=None, max_bytes=None, storage=None, replicas=1, placement=None, republish=None, direct=None, declare=True)
Source code in faststream/nats/helpers/bucket_declarer.py
async def create_key_value(
    self,
    bucket: str,
    *,
    description: Optional[str] = None,
    max_value_size: Optional[int] = None,
    history: int = 1,
    ttl: Optional[float] = None,  # in seconds
    max_bytes: Optional[int] = None,
    storage: Optional["StorageType"] = None,
    replicas: int = 1,
    placement: Optional["Placement"] = None,
    republish: Optional["RePublish"] = None,
    direct: Optional[bool] = None,
    # custom
    declare: bool = True,
) -> "KeyValue":
    if (key_value := self.buckets.get(bucket)) is None:
        if declare:
            key_value = await self._connection.create_key_value(
                config=KeyValueConfig(
                    bucket=bucket,
                    description=description,
                    max_value_size=max_value_size,
                    history=history,
                    ttl=ttl,
                    max_bytes=max_bytes,
                    storage=storage,
                    replicas=replicas,
                    placement=placement,
                    republish=republish,
                    direct=direct,
                )
            )
        else:
            key_value = await self._connection.key_value(bucket)

        self.buckets[bucket] = key_value

    return key_value