Skip to content

Stream Acknowledgement#

When working with Redis streams in the FastStream library, it's important to manage message acknowledgements carefully to ensure that messages are not lost and that they have been processed as intended.

By default, when using the FastStream with a Redis stream, the library will automatically acknowledge (ack) that a message has been processed. This follows the at most once processing guarantee.

Manual Acknowledgement#

In cases where you want explicit control over when a message is acknowledged, you can manually acknowledge a message by accessing the ack and nack methods provided:

from faststream.redis.annotations import RedisMessage, Redis

# Setup broker and faststream app
...

@broker.subscriber(StreamSub("test-stream", group="test-group", consumer="1"))
async def base_handler(body: dict, msg: RedisMessage, redis: Redis):
    # Process the message
    ...

    # Manually acknowledge the message
    await msg.ack(redis)
    # or, if processing fails and you want to reprocess later
    await msg.nack()

Using ack will mark the message as processed in the stream, while nack is useful for situations where you might need to reprocess a message due to a handling failure.

Interrupt Process#

If the need arises to instantly interrupt message processing at any point in the call stack and acknowledge the message, you can achieve this by raising the faststream.exceptions.AckMessage exception:

from faststream import FastStream
from faststream.exceptions import AckMessage
from faststream.redis import RedisBroker, StreamSub

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)


@broker.subscriber(stream=StreamSub("test-stream", group="test-group", consumer="1"))
async def handle(body):
    processing_logic(body)


def processing_logic(body):
    if True:
        raise AckMessage()


@app.after_startup
async def test_publishing():
    await broker.publish("Hello World!", stream="test-stream")

By raising AckMessage, FastStream will halt the current message processing routine and immediately acknowledge it. Analogously, raising NackMessage would prevent the message from being acknowledged and could lead to its subsequent reprocessing by the same or a different consumer.

Tip

If you want to disable FastStream Acknowledgement logic at all, you can use @broker.subscriber(..., no_ack=True) option. This way you should always process a message (ack/nack/terminate/etc) by yourself.