Skip to content

Object Storage#

Object storage is almost identical to the Key-Value storage concept, so you can reuse the guide.

Overview#

Object Storage is just a high-level interface on top of NatsJS.

It is a regular JetStream, where the Object key is a subject.

The main difference between KV and Object storages is that in the Object storage, you can store files greater than 1MB (a limitation of KV). It has no limit on the maximum object size and stores it in chunks (each message is an object chunk), so you can literally stream huge objects through NATS.

FastStream Details#

FastStream has some useful methods to help you with Object Storage NATS feature interacting.

First of all, you need to create a Object Storage object and put some value to it:

from io import BytesIO

from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker()
app = FastStream(broker)

@app.after_startup
async def test_send():
    object_storage = await broker.object_storage("example-bucket")
    await object_storage.put("file.txt", BytesIO(b"File mock"))

Tip

  • BytesIO - is a Readable object used to emulate a file opened for reading.

  • broker.object_storage(bucket="example-bucket") is an idempotent method. It means that it stores all already created storages in memory and do not make new request to NATS if your are trying to call it for the same bucket.


Then we are able to use returned object_storage object as a regular NATS one. But, if you want to watch by any new files in the bucket, FastStream allows you to make it via regular @broker.subscriber interface:

1
2
3
@broker.subscriber("example-bucket", obj_watch=True)
async def handler(filename: str):
    assert filename == "file.txt"

NATS delivers you just a filename (and some more metainformation you can get access via message.raw_message) because files can be any size. The framework should protect your service from memory overflow, so we can't upload whole file content right to the memo. By you can make it manually the following way:

from faststream.nats.annotations import ObjectStorage

@broker.subscriber("example-bucket", obj_watch=True)
async def handler(
    filename: str,
    storage: ObjectStorage,
    logger: Logger,
):
    assert filename == "file.txt"
    file = await storage.get(filename)
    logger.info(file.data)

Note

faststream.nats.annotations.ObjectStorage is a your current bucket, so you need no to put it to context manually.

Also, if you want more detail settings for you Object Storage, we have ObjWatch object for it:

1
2
3
4
5
6
7
8
from faststream.nats import NatsBroker, ObjWatch

@broker.subscriber(
    "example-bucket",
    obj_watch=ObjWatch(declare=False),
)
async def handler(filename: str):
    ...