Skip to content

Direct Exchange#

The Direct Exchange is the basic way to route messages in RabbitMQ. Its core is very simple: the exchange sends messages to those queues whose routing_key matches the routing_key of the message being sent.

Note

The Default Exchange, to which all queues in RabbitMQ are subscribed, has the Direct type by default.

Scaling#

If several consumers are listening to the same queue, messages will be distributed to one of them (round-robin). This behavior is common for all types of exchange because it refers to the queue itself. The type of exchange affects which queues the message gets into.

Thus, RabbitMQ can independently balance the load on queue consumers. You can increase the processing speed of the message flow from the queue by launching additional instances of a consumer service. You don't need to make changes to the current infrastructure configuration: RabbitMQ will take care of how to distribute messages between your services.

Example#

Tip

The Direct Exchange is the type used in FastStream by default. You can simply declare it as follows:

@broker.subscriber("test_queue", "test_exchange")
async def handler():
    ...

The argument auto_delete=True in this and subsequent examples is used only to clear the state of RabbitMQ after example runs.

from faststream import FastStream, Logger
from faststream.rabbit import RabbitBroker, RabbitExchange, RabbitQueue

broker = RabbitBroker()
app = FastStream(broker)

exch = RabbitExchange("exchange", auto_delete=True)

queue_1 = RabbitQueue("test-q-1", auto_delete=True)
queue_2 = RabbitQueue("test-q-2", auto_delete=True)


@broker.subscriber(queue_1, exch)
async def base_handler1(logger: Logger):
    logger.info("base_handler1")


@broker.subscriber(queue_1, exch)  # another service
async def base_handler2(logger: Logger):
    logger.info("base_handler2")


@broker.subscriber(queue_2, exch)
async def base_handler3(logger: Logger):
    logger.info("base_handler3")


@app.after_startup
async def send_messages():
    await broker.publish(queue="test-q-1", exchange=exch)  # handlers: 1
    await broker.publish(queue="test-q-1", exchange=exch)  # handlers: 2
    await broker.publish(queue="test-q-1", exchange=exch)  # handlers: 1
    await broker.publish(queue="test-q-2", exchange=exch)  # handlers: 3

Consumer Announcement#

First, we announce our Direct exchange and several queues that will listen to it:

exch = RabbitExchange("exchange", auto_delete=True)

queue_1 = RabbitQueue("test-q-1", auto_delete=True)
queue_2 = RabbitQueue("test-q-2", auto_delete=True)

Then we sign up several consumers using the advertised queues to the exchange we created:

@broker.subscriber(queue_1, exch)
async def base_handler1(logger: Logger):
    logger.info("base_handler1")


@broker.subscriber(queue_1, exch)  # another service
async def base_handler2(logger: Logger):
    logger.info("base_handler2")


@broker.subscriber(queue_2, exch)
async def base_handler3(logger: Logger):
    logger.info("base_handler3")

Note

handler1 and handler2 are subscribed to the same exchange using the same queue: within a single service, this does not make sense, since messages will come to these handlers in turn. Here we emulate the work of several consumers and load balancing between them.

Message Distribution#

Now, the distribution of messages between these consumers will look like this:

    await broker.publish(queue="test-q-1", exchange=exch)  # handlers: 1

Message 1 will be sent to handler1 because it listens to the exchange using a queue with the routing key test-q-1.


    await broker.publish(queue="test-q-1", exchange=exch)  # handlers: 2

Message 2 will be sent to handler2 because it listens to the exchange using the same queue, but handler1 is busy.


    await broker.publish(queue="test-q-1", exchange=exch)  # handlers: 1

Message 3 will be sent to handler1 again because it is currently free.


    await broker.publish(queue="test-q-2", exchange=exch)  # handlers: 3

Message 4 will be sent to handler3 because it is the only one listening to the exchange using a queue with the routing key test-q-2.


Last update: 2023-09-25