Skip to content

Consuming Acknowledgements#

As you may know, Kafka consumer should commit a topic offset when consuming a message.

The default behaviour, also implemented as such in the FastStream, automatically commits (acks) topic offset on message consumption. This is the at most once consuming strategy.

However, if you wish to use at least once strategy, you should commit offset AFTER the message is processed correctly. To accomplish that, set a consumer group and disable auto_commit option like this:

@broker.subscriber(
    "test", group_id="group", auto_commit=False
)
async def base_handler(body: str):
    ...

This way, upon successful return of the processing function, the message processed will be acknowledged. In the case of an exception being raised, the message will not be acknowledged.

However, there are situations where you might want to use a different acknowledgement logic.

Manual Acknowledgement#

If you want to acknowledge a message manually, you can get direct access to the message object via the Context and acknowledge the message by calling the ack method:

from faststream.kafka.annotations import KafkaMessage


@broker.subscriber(
    "test", group_id="group", auto_commit=False
)
async def base_handler(body: str, msg: KafkaMessage):
    await msg.ack()
    # or
    await msg.nack()

Tip

You can use the nack method to prevent offset commit and the message can be consumed by another consumer within the same group.

FastStream will see that the message was already acknowledged and will do nothing at the end of the process.

Interrupt Process#

If you wish to interrupt the processing of a message at any call stack level and acknowledge the message, you can achieve that by raising the faststream.exceptions.AckMessage.

from faststream import FastStream
from faststream.exceptions import AckMessage
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.subscriber(
    "test-topic", group_id="test-group", auto_commit=False
)
async def handle(body):
    smth_processing(body)


def smth_processing(body):
    if True:
        raise AckMessage()


@app.after_startup
async def test_publishing():
    await broker.publish("Hello!", "test-topic")

This way, FastStream interrupts the current message processing and acknowledges it immediately. Similarly, you can raise NackMessage as well to prevent the message from being committed.


Last update: 2023-10-15