Skip to content

Fanout Exchange

Fanout Exchange is an even simpler, but slightly less popular way of routing in RabbitMQ. This type of exchange sends messages to all queues subscribed to it, ignoring any arguments of the message.

At the same time, if the queue listens to several consumers, messages will also be distributed among them.

Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from propan import PropanApp, RabbitBroker
from propan.annotations import Logger
from propan.brokers.rabbit import RabbitExchange, RabbitQueue, ExchangeType

broker = RabbitBroker()
app = PropanApp(broker)

exch = RabbitExchange("exchange", auto_delete=True, type=ExchangeType.FANOUT)

queue_1 = RabbitQueue("test-q-1", auto_delete=True)
queue_2 = RabbitQueue("test-q-2", auto_delete=True)

@broker.handle(queue_1, exch)
async def base_handler1(logger: Logger):
    logger.info("base_handler1")

@broker.handle(queue_1, exch)
async def base_handler2(logger: Logger):
    logger.info("base_handler2")

@broker.handle(queue_2, exch)
async def base_handler3(logger: Logger):
    logger.info("base_handler3")

@app.after_startup
async def send_messages():
    await broker.publish(exchange=exch)  # handlers: 1, 3
    await broker.publish(exchange=exch)  # handlers: 2, 3
    await broker.publish(exchange=exch)  # handlers: 1, 3
    await broker.publish(exchange=exch)  # handlers: 2, 3

Consumer Announcement

To begin with, we announced our Fanout exchange and several queues that will listen to it:

 8
 9
10
11
exch = RabbitExchange("exchange", auto_delete=True, type=ExchangeType.FANOUT)

queue_1 = RabbitQueue("test-q-1", auto_delete=True)
queue_2 = RabbitQueue("test-q-2", auto_delete=True)

Then we signed up several consumers using the advertised queues to the exchange we created

13
14
15
16
17
18
19
20
21
22
23
@broker.handle(queue_1, exch)
async def base_handler1(logger: Logger):
    logger.info("base_handler1")

@broker.handle(queue_1, exch)
async def base_handler2(logger: Logger):
    logger.info("base_handler2")

@broker.handle(queue_2, exch)
async def base_handler3(logger: Logger):
    logger.info("base_handler3")

Note

handler1 and handler2 are subscribed to the same exchange using the same queue: within a single service, this does not make a sense, since messages will come to these handlers in turn. Here we emulate the work of several consumers and load balancing between them.

Message distribution

Now the distribution of messages between these consumers will look like this:

    await broker.publish(exchange=exch)  # handlers: 1, 3

Messages 1 will be sent to handler1 and handler3, because they listen to exchange using different queues


    await broker.publish(exchange=exch)  # handlers: 2, 3

Messages 2 will be sent to handler2 and handler3, because handler2 listens to exchange using the same queue as handler1


    await broker.publish(exchange=exch)  # handlers: 1, 3

Messages 3 will be sent to handler1 and handler3


    await broker.publish(exchange=exch)  # handlers: 2, 3

Messages 4 will be sent to handler3 and handler3


Note

When sending messages to Fanout exchange, it makes no sense to specify the arguments queue or routing_key, because they will be ignored