Skip to content

Pull Subscriber#

Overview#

NATS JetStream supports two various way to consume messages: Push and Pull consumers.

The Push consumer is used by default to consume messages with the FastStream. It means that the NATS server delivers messages to your consumer as far as possible by itself. However, it also means that NATS should control all current consumer connections and increase server load.

Thus, the Pull consumer is the recommended way to consume JetStream messages by the NATS TEAM. Using it, you simply ask NATS for new messages at some interval. It may sound a little less convenient than automatic message delivery, but it provides several advantages, such as:

  • Consumer scaling without a queue group
  • Handling messages in batches
  • Reducing NATS server load

So, if you want to consume a large flow of messages without strict time limitations, the Pull consumer is the right choice for you.

FastStream Details#

The Pull consumer is just a regular Stream consumer, but with the pull_sub argument, which controls consuming messages with batch size and block interval.

from faststream import FastStream, Logger
from faststream.nats import NatsBroker, PullSub

broker = NatsBroker()
app = FastStream(broker)


@broker.subscriber(
    subject="test",
    stream="stream",
    pull_sub=PullSub(batch_size=10),
)
async def handle(msg, logger: Logger):
    logger.info(msg)

The batch size doesn't mean that your msg argument is a list of messages, but it means that you consume up to 10 messages for one request to NATS and call your handler for each message in an asyncio.gather pool.

So, your subject will be processed much faster, without blocking for each message processing. However, if your subject has fewer than 10 messages, your request to NATS will be blocked for timeout (5 seconds by default) while trying to collect the required number of messages. Therefor, you should choose batch_size and timeout accurately to optimize your consumer efficiency.


Last update: 2023-11-09