Skip to content

Topic Exchange

Topic Exchange - мощный механизм маршрутизации RabbitMQ. Данный тип exchange отправляет сообщения в очереди в соответствии с паттерном, указанном при их подключении к exchange и routing_key самого сообщения.

При этом, если очередь слушает несколько потребителей, сообщения все также будут распределяться между ними.

Пример

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from propan import PropanApp, RabbitBroker
from propan.annotations import Logger
from propan.brokers.rabbit import RabbitExchange, RabbitQueue, ExchangeType

broker = RabbitBroker()
app = PropanApp(broker)

exch = RabbitExchange("exchange", auto_delete=True, type=ExchangeType.TOPIC)

queue_1 = RabbitQueue("test-queue-1", auto_delete=True, routing_key="*.info")
queue_2 = RabbitQueue("test-queue-2", auto_delete=True, routing_key="*.debug")

@broker.handle(queue_1, exch)
async def base_handler1(logger: Logger):
    logger.info("base_handler1")

@broker.handle(queue_1, exch)
async def base_handler2(logger: Logger):
    logger.info("base_handler2")

@broker.handle(queue_2, exch)
async def base_handler3(logger: Logger):
    logger.info("base_handler3")

@app.after_startup
async def send_messages():
    await broker.publish(routing_key="logs.info", exchange=exch)  # handlers: 1
    await broker.publish(routing_key="logs.info", exchange=exch)  # handlers: 2
    await broker.publish(routing_key="logs.info", exchange=exch)  # handlers: 1
    await broker.publish(routing_key="logs.debug", exchange=exch) # handlers: 3

Объявление потребителей

Для начала мы объявили наш Topic exchange и несколько очередей, которые будут его слушать:

 8
 9
10
11
exch = RabbitExchange("exchange", auto_delete=True, type=ExchangeType.TOPIC)

queue_1 = RabbitQueue("test-queue-1", auto_delete=True, routing_key="*.info")
queue_2 = RabbitQueue("test-queue-2", auto_delete=True, routing_key="*.debug")

При этом в routing_key наших очередей мы указываем паттерн ключей маршрутизации, которые будут обрабатываться этой очередью.

Затем мы подписали несколько потребителей с помощью объявленных очередей на созданный нами exchange

13
14
15
16
17
18
19
20
21
22
23
@broker.handle(queue_1, exch)
async def base_handler1(logger: Logger):
    logger.info("base_handler1")

@broker.handle(queue_1, exch)
async def base_handler2(logger: Logger):
    logger.info("base_handler2")

@broker.handle(queue_2, exch)
async def base_handler3(logger: Logger):
    logger.info("base_handler3")

Note

Обратите внимание, что handler1 и handler2 подписаны на один exchange с помощью одной и той же очереди: в рамках одного сервиса это не имеет смысла, так как сообщения будут приходить в эти обработчики поочередно. Здесь мы эмулируем работу несколько потребителей и балансировку нагрузки между ними.

Распределение сообщений

Теперь распределение сообщений между этими потребителями будет выглядеть следующим образом:

    await broker.publish(routing_key="logs.info", exchange=exch)  # handlers: 1

Сообщение 1 будет отправлено в handler1, т.к. он слушает exchange с помощью очереди с ключом маршрутизации *.info


    await broker.publish(routing_key="logs.info", exchange=exch)  # handlers: 2

Сообщение 2 будет отправлено в handler2, т.к. он слушает exchange с помощью той же очереди, но handler1 занят


    await broker.publish(routing_key="logs.info", exchange=exch)  # handlers: 1

Сообщение 3 снова будет отправлено в handler1, т.к. он освободился на данный момент


    await broker.publish(routing_key="logs.debug", exchange=exch) # handlers: 3

Сообщение 4 будет отправлено в handler3, т.к. он единственный слушает exchange с помощью очереди с ключом маршрутизации *.debug