Direct Exchange¶
Direct Exchange - базовый способ маршрутизации сообщений в RabbitMQ. Его суть очень проста:
exchange
отправляет сообщения в те очереди, routing_key
которых совпадает с routing_key
отправляемого сообщения.
Note
Default Exchange, на который подписаны все очереди в RabbitMQ по умолчанию имеет тип Direct
Масштабирование¶
Если одну очередь слушает несколько потребителей, сообщение будет уходить каждый раз новому потребителю. Это поведение общее для всех типов exchange
, т.к. оно относится к самой очереди. Тип exchange
влияет на то, в какие очереди попадет сообщение.
Таким образом, RabbitMQ может самостоятельно балансировать нагрузку на потребителей очереди. Вы можете увеличить скорость обработки потока сообщений из очереди просто запустив дополнительные инстансы сервиса-потребителя. Вам не нужно вносить изменений в текущую конфигурацию инфраструктуры: RabbitMQ сам позаботится о том, как распределить сообщения между вашими сервисами.
Пример¶
Direct Exchange - тип, используемый в Propan по умолчанию: вы можете просто объявить его следующим образом
@broker.handler("test_queue", "test_exchange")
async def handler():
...
Аргумент auto_delete=True
в этом и последующих примерах используется только для того, чтобы очистить состояние RabbitMQ после примера
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
|
Объявление потребителей¶
Для начала мы объявили наш Direct exchange и несколько очередей, которые будут его слушать:
8 9 10 11 |
|
Затем мы подписали несколько потребителей с помощью объявленных очередей на созданный нами exchange
13 14 15 16 17 18 19 20 21 22 23 |
|
Note
Обратите внимание, что handler1
и handler2
подписаны на один exchange
с помощью одной и той же очереди:
в рамках одного сервиса это не имеет смысла, так как сообщения будут приходить в эти обработчики поочередно.
Здесь мы эмулируем работу несколько потребителей и балансировку нагрузки между ними.
Распределение сообщений¶
Теперь распределение сообщений между этими потребителями будет выглядеть следующим образом:
await broker.publish(queue="test-q-1", exchange=exch) # handlers: 1
Сообщение 1
будет отправлено в handler1
, т.к. он слушает exchange
с помощью очереди с ключом маршрутизации test-q-1
await broker.publish(queue="test-q-1", exchange=exch) # handlers: 2
Сообщение 2
будет отправлено в handler2
, т.к. он слушает exchange
с помощью той же очереди, но handler1
занят
await broker.publish(queue="test-q-1", exchange=exch) # handlers: 1
Сообщение 3
снова будет отправлено в handler1
, т.к. он освободился на данный момент
await broker.publish(queue="test-q-2", exchange=exch) # handlers: 3
Сообщение 4
будет отправлено в handler3
, т.к. он единственный слушает exchange
с помощью очереди с ключом маршрутизации test-q-2