Перейти к содержанию

Direct Exchange

Direct Exchange - базовый способ маршрутизации сообщений в RabbitMQ. Его суть очень проста: exchange отправляет сообщения в те очереди, routing_key которых совпадает с routing_key отправляемого сообщения.

Note

Default Exchange, на который подписаны все очереди в RabbitMQ по умолчанию имеет тип Direct

Масштабирование

Если одну очередь слушает несколько потребителей, сообщение будет уходить каждый раз новому потребителю. Это поведение общее для всех типов exchange, т.к. оно относится к самой очереди. Тип exchange влияет на то, в какие очереди попадет сообщение.

Таким образом, RabbitMQ может самостоятельно балансировать нагрузку на потребителей очереди. Вы можете увеличить скорость обработки потока сообщений из очереди просто запустив дополнительные инстансы сервиса-потребителя. Вам не нужно вносить изменений в текущую конфигурацию инфраструктуры: RabbitMQ сам позаботится о том, как распределить сообщения между вашими сервисами.

Пример

Direct Exchange - тип, используемый в Propan по умолчанию: вы можете просто объявить его следующим образом

@broker.handler("test_queue", "test_exchange")
async def handler():
    ...

Аргумент auto_delete=True в этом и последующих примерах используется только для того, чтобы очистить состояние RabbitMQ после примера

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from propan import PropanApp, RabbitBroker
from propan.annotations import Logger
from propan.brokers.rabbit import RabbitExchange, RabbitQueue

broker = RabbitBroker()
app = PropanApp(broker)

exch = RabbitExchange("exchange", auto_delete=True)

queue_1 = RabbitQueue("test-q-1", auto_delete=True)
queue_2 = RabbitQueue("test-q-2", auto_delete=True)

@broker.handle(queue_1, exch)
async def base_handler1(logger: Logger):
    logger.info("base_handler1")

@broker.handle(queue_1, exch)
async def base_handler2(logger: Logger):
    logger.info("base_handler2")

@broker.handle(queue_2, exch)
async def base_handler3(logger: Logger):
    logger.info("base_handler3")

@app.after_startup
async def send_messages():
    await broker.publish(queue="test-q-1", exchange=exch)  # handlers: 1
    await broker.publish(queue="test-q-1", exchange=exch)  # handlers: 2
    await broker.publish(queue="test-q-1", exchange=exch)  # handlers: 1
    await broker.publish(queue="test-q-2", exchange=exch)  # handlers: 3

Объявление потребителей

Для начала мы объявили наш Direct exchange и несколько очередей, которые будут его слушать:

 8
 9
10
11
exch = RabbitExchange("exchange", auto_delete=True)

queue_1 = RabbitQueue("test-q-1", auto_delete=True)
queue_2 = RabbitQueue("test-q-2", auto_delete=True)

Затем мы подписали несколько потребителей с помощью объявленных очередей на созданный нами exchange

13
14
15
16
17
18
19
20
21
22
23
@broker.handle(queue_1, exch)
async def base_handler1(logger: Logger):
    logger.info("base_handler1")

@broker.handle(queue_1, exch)
async def base_handler2(logger: Logger):
    logger.info("base_handler2")

@broker.handle(queue_2, exch)
async def base_handler3(logger: Logger):
    logger.info("base_handler3")

Note

Обратите внимание, что handler1 и handler2 подписаны на один exchange с помощью одной и той же очереди: в рамках одного сервиса это не имеет смысла, так как сообщения будут приходить в эти обработчики поочередно. Здесь мы эмулируем работу несколько потребителей и балансировку нагрузки между ними.

Распределение сообщений

Теперь распределение сообщений между этими потребителями будет выглядеть следующим образом:

    await broker.publish(queue="test-q-1", exchange=exch)  # handlers: 1

Сообщение 1 будет отправлено в handler1, т.к. он слушает exchange с помощью очереди с ключом маршрутизации test-q-1


    await broker.publish(queue="test-q-1", exchange=exch)  # handlers: 2

Сообщение 2 будет отправлено в handler2, т.к. он слушает exchange с помощью той же очереди, но handler1 занят


    await broker.publish(queue="test-q-1", exchange=exch)  # handlers: 1

Сообщение 3 снова будет отправлено в handler1, т.к. он освободился на данный момент


    await broker.publish(queue="test-q-2", exchange=exch)  # handlers: 3

Сообщение 4 будет отправлено в handler3, т.к. он единственный слушает exchange с помощью очереди с ключом маршрутизации test-q-2