Skip to content

Pattern#

Pattern Subject is a powerful NATS routing engine. This type of subject routes messages to consumers based on the pattern specified when they connect to the subject and a message key.

Scaling#

If one subject is being listened to by several consumers with the same queue group, the message will go to a random consumer each time.

Thus, NATS can independently balance the load on queue consumers. You can increase the processing speed of the message flow from the queue by simply launching additional instances of the consumer service. You don't need to make changes to the current infrastructure configuration: NATS will take care of how to distribute messages between your services.

Tip

By default, all subscribers are consuming messages from subject in blocking mode. You can't process multiple messages from the same subject in the same time. So, you have some kind of block per subject.

But, all NatsBroker subscribers has max_workers argument allows you to consume messages in a per-subscriber pool. So, if you have subscriber like @broker.subscriber(..., max_workers=10), it means that you can process up to 10 by it in the same time.

Example#

from faststream import FastStream, Logger
from faststream.nats import NatsBroker

broker = NatsBroker()
app = FastStream(broker)

@broker.subscriber("*.info", "workers")
async def base_handler1(logger: Logger):
    logger.info("base_handler1")

@broker.subscriber("*.info", "workers")
async def base_handler2(logger: Logger):  
    logger.info("base_handler2")

@broker.subscriber("*.error", "workers")
async def base_handler3(logger: Logger):
    logger.info("base_handler3")

@app.after_startup
async def send_messages():
    await broker.publish("", "logs.info")  # handlers: 1 or 2
    await broker.publish("", "logs.info")  # handlers: 1 or 2
    await broker.publish("", "logs.error") # handlers: 3

Consumer Announcement#

To begin with, we have announced several consumers for two subjects: "*.info" and "*.error":

@broker.subscriber("*.info", "workers")
async def base_handler1(logger: Logger):
    logger.info("base_handler1")

@broker.subscriber("*.info", "workers")
async def base_handler2(logger: Logger):  
    logger.info("base_handler2")

@broker.subscriber("*.error", "workers")
async def base_handler3(logger: Logger):
    logger.info("base_handler3")

At the same time, in the subject of our consumers, we specify the pattern that will be processed by these consumers.

Note

Note that all consumers are subscribed using the same queue_group. Within the same service, this does not make sense, since messages will come to these handlers in turn. Here, we emulate the work of several consumers and load balancing between them.

Message Distribution#

Now the distribution of messages between these consumers will look like this:

await broker.publish("", "logs.info")  # handlers: 1 or 2

The message 1 will be sent to handler1 or handler2 because they listen to the same subject template within the same queue group.


await broker.publish("", "logs.info")  # handlers: 1 or 2

Message 2 will be sent similarly to message 1.


await broker.publish("", "logs.error") # handlers: 3

The message 3 will be sent to handler3 because it is the only one listening to the pattern "*.error".