Перейти к содержанию

Header Exchange

Header Exchange - самый сложный и гибкий способ маршрутизации сообщений в RabbitMQ. Данный тип exchange отправляет сообщения в очереди в соответствии с совпадением аргументов привязки этих очередей к exchange с заголовками сообщений.

При этом, если очередь слушает несколько потребителей, сообщения все также будут распределяться между ними.

Пример

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from propan import PropanApp, RabbitBroker
from propan.annotations import Logger
from propan.brokers.rabbit import RabbitExchange, RabbitQueue, ExchangeType

broker = RabbitBroker()
app = PropanApp(broker)

exch = RabbitExchange("exchange", auto_delete=True, type=ExchangeType.HEADERS)

queue_1 = RabbitQueue(
    "test-queue-1", auto_delete=True,
    bind_arguments={ "key": 1 }
)
queue_2 = RabbitQueue(
    "test-queue-2", auto_delete=True,
    bind_arguments={ "key": 2, "key2": 2, "x-match": "any" }
)
queue_3 = RabbitQueue(
    "test-queue-3", auto_delete=True,
    bind_arguments={ "key": 2, "key2": 2, "x-match": "all" }
)

@broker.handle(queue_1, exch)
async def base_handler1(logger: Logger):
    logger.info("base_handler1")

@broker.handle(queue_1, exch)
async def base_handler2(logger: Logger):
    logger.info("base_handler2")

@broker.handle(queue_2, exch)
async def base_handler3(logger: Logger):
    logger.info("base_handler3")

@broker.handle(queue_3, exch)
async def base_handler4(logger: Logger):
    logger.info("base_handler4")

@app.after_startup
async def send_messages():
    await broker.publish(exchange=exch, headers={ "key": 1 })  # handlers: 1
    await broker.publish(exchange=exch, headers={ "key": 1 })  # handlers: 2
    await broker.publish(exchange=exch, headers={ "key": 1 })  # handlers: 1
    await broker.publish(exchange=exch, headers={ "key": 2 })  # handlers: 3
    await broker.publish(exchange=exch, headers={ "key2": 2 }) # handlers: 3
    await broker.publish(exchange=exch, headers={ "key": 2,    # handlers: 3, 4
                                                  "key2": 2 })

Объявление потребителей

Для начала мы объявили наш Fanout exchange и несколько очередей, которые будут его слушать:

 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
exch = RabbitExchange("exchange", auto_delete=True, type=ExchangeType.HEADERS)

queue_1 = RabbitQueue(
    "test-queue-1", auto_delete=True,
    bind_arguments={ "key": 1 }
)
queue_2 = RabbitQueue(
    "test-queue-2", auto_delete=True,
    bind_arguments={ "key": 2, "key2": 2, "x-match": "any" }
)
queue_3 = RabbitQueue(
    "test-queue-3", auto_delete=True,
    bind_arguments={ "key": 2, "key2": 2, "x-match": "all" }
)

Аргумент x-match говорит о том, должны сопадать аргументы с заголовками сообщений полностью или частично.

Затем мы подписали несколько потребителей с помощью объявленных очередей на созданный нами exchange

23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@broker.handle(queue_1, exch)
async def base_handler1(logger: Logger):
    logger.info("base_handler1")

@broker.handle(queue_1, exch)
async def base_handler2(logger: Logger):
    logger.info("base_handler2")

@broker.handle(queue_2, exch)
async def base_handler3(logger: Logger):
    logger.info("base_handler3")

@broker.handle(queue_3, exch)
async def base_handler4(logger: Logger):
    logger.info("base_handler4")

Note

Обратите внимание, что handler1 и handler2 подписаны на один exchange с помощью одной и той же очереди: в рамках одного сервиса это не имеет смысла, так как сообщения будут приходить в эти обработчики поочередно. Здесь мы эмулируем работу несколько потребителей и балансировку нагрузки между ними.

Распределение сообщений

Теперь распределение сообщений между этими потребителями будет выглядеть следующим образом:

    await broker.publish(exchange=exch, headers={ "key": 1 })  # handlers: 1

Сообщение 1 будет отправлено в handler1, т.к. он слушает очередь, заголовок key которой, совпал с заголовком key сообщения


    await broker.publish(exchange=exch, headers={ "key": 1 })  # handlers: 2

Сообщение 2 будет отправлено в handler2, т.к. он слушает exchange с помощью той же очереди, но handler1 занят


    await broker.publish(exchange=exch, headers={ "key": 1 })  # handlers: 1

Сообщение 3 снова будет отправлено в handler1, т.к. он освободился на данный момент


    await broker.publish(exchange=exch, headers={ "key": 2 })  # handlers: 3

Сообщение 4 будет отправлено в handler3, т.к. он слушает очередь, заголовок key которой, совпал с заголовком key сообщения


    await broker.publish(exchange=exch, headers={ "key2": 2 }) # handlers: 3

Сообщение 5 будет отправлено в handler3, т.к. он слушает очередь, заголовок key2 которой, совпал с заголовком key2 сообщения


    await broker.publish(exchange=exch, headers={ "key": 2,    # handlers: 3, 4
                                                  "key2": 2 })

Сообщение 6 будет отправлено в handler3 и handler4, т.к. заголовки сообщений полностью совпали с ключами очередей


Note

При отправке сообщений в Header exchange нет смысл указывать аргументы queue или routing_key, т.к. они будут проигнорированы

Warning

Для невероятно сложных маршрутов вы можете использовать возможность подписывать exchange на другой exchange с указанием ключа маршрутизации. В таком случае действуют все те же правила, что и для очередей, подписанных на exchange. Отличие только в том, что подписанный exchange может дальше распределять сообщения в соответствии со своими правилами.

Так, например, вы можете совместить Topic и Header типы.