Skip to content

Fanout Exchange

Fanout Exchange - еще более простой, но чуть менее популярный способ маршрутизации в RabbitMQ. Данный тип exchange отправляет сообщения во все очереди, подписанные на него, игнорируя любые аргументы самого сообщения.

При этом, если очередь слушает несколько потребителей, сообщения все также будут распределяться между ними.

Пример

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from propan import PropanApp, RabbitBroker
from propan.annotations import Logger
from propan.brokers.rabbit import RabbitExchange, RabbitQueue, ExchangeType

broker = RabbitBroker()
app = PropanApp(broker)

exch = RabbitExchange("exchange", auto_delete=True, type=ExchangeType.FANOUT)

queue_1 = RabbitQueue("test-q-1", auto_delete=True)
queue_2 = RabbitQueue("test-q-2", auto_delete=True)

@broker.handle(queue_1, exch)
async def base_handler1(logger: Logger):
    logger.info("base_handler1")

@broker.handle(queue_1, exch)
async def base_handler2(logger: Logger):
    logger.info("base_handler2")

@broker.handle(queue_2, exch)
async def base_handler3(logger: Logger):
    logger.info("base_handler3")

@app.after_startup
async def send_messages():
    await broker.publish(exchange=exch)  # handlers: 1, 3
    await broker.publish(exchange=exch)  # handlers: 2, 3
    await broker.publish(exchange=exch)  # handlers: 1, 3
    await broker.publish(exchange=exch)  # handlers: 2, 3

Объявление потребителей

Для начала мы объявили наш Fanout exchange и несколько очередей, которые будут его слушать:

 8
 9
10
11
exch = RabbitExchange("exchange", auto_delete=True, type=ExchangeType.FANOUT)

queue_1 = RabbitQueue("test-q-1", auto_delete=True)
queue_2 = RabbitQueue("test-q-2", auto_delete=True)

Затем мы подписали несколько потребителей с помощью объявленных очередей на созданный нами exchange

13
14
15
16
17
18
19
20
21
22
23
@broker.handle(queue_1, exch)
async def base_handler1(logger: Logger):
    logger.info("base_handler1")

@broker.handle(queue_1, exch)
async def base_handler2(logger: Logger):
    logger.info("base_handler2")

@broker.handle(queue_2, exch)
async def base_handler3(logger: Logger):
    logger.info("base_handler3")

Note

Обратите внимание, что handler1 и handler2 подписаны на один exchange с помощью одной и той же очереди: в рамках одного сервиса это не имеет смысла, так как сообщения будут приходить в эти обработчики поочередно. Здесь мы эмулируем работу несколько потребителей и балансировку нагрузки между ними.

Распределение сообщений

Теперь распределение сообщений между этими потребителями будет выглядеть следующим образом:

    await broker.publish(exchange=exch)  # handlers: 1, 3

Сообщение 1 будет отправлено в handler1 и handler3, т.к. они слушает exchange с помощью разных очередей


    await broker.publish(exchange=exch)  # handlers: 2, 3

Сообщение 2 будет отправлено в handler2 и handler3, т.к. handler2 слушает exchange с помощью той же очереди, что и handler1


    await broker.publish(exchange=exch)  # handlers: 1, 3

Сообщение 3 будет отправлено в handler1 и handler3


    await broker.publish(exchange=exch)  # handlers: 2, 3

Сообщение 4 будет отправлено в handler3 и handler3


Note

При отправке сообщений в Fanout exchange нет смысл указывать аргументы queue или routing_key, т.к. они будут проигнорированы