Direct Exchange¶
Direct Exchange - базовый способ маршрутизации сообщений в RabbitMQ. Его суть очень проста:
exchange отправляет сообщения в те очереди, routing_key которых совпадает с routing_key отправляемого сообщения.
Note
Default Exchange, на который подписаны все очереди в RabbitMQ по умолчанию имеет тип Direct
Масштабирование¶
Если одну очередь слушает несколько потребителей, сообщение будет уходить каждый раз новому потребителю. Это поведение общее для всех типов exchange, т.к. оно относится к самой очереди. Тип exchange влияет на то, в какие очереди попадет сообщение.
Таким образом, RabbitMQ может самостоятельно балансировать нагрузку на потребителей очереди. Вы можете увеличить скорость обработки потока сообщений из очереди просто запустив дополнительные инстансы сервиса-потребителя. Вам не нужно вносить изменений в текущую конфигурацию инфраструктуры: RabbitMQ сам позаботится о том, как распределить сообщения между вашими сервисами.
Пример¶
Direct Exchange - тип, используемый в Propan по умолчанию: вы можете просто объявить его следующим образом
@broker.handler("test_queue", "test_exchange")
async def handler():
...
Аргумент auto_delete=True в этом и последующих примерах используется только для того, чтобы очистить состояние RabbitMQ после примера
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | |
Объявление потребителей¶
Для начала мы объявили наш Direct exchange и несколько очередей, которые будут его слушать:
8 9 10 11 | |
Затем мы подписали несколько потребителей с помощью объявленных очередей на созданный нами exchange
13 14 15 16 17 18 19 20 21 22 23 | |
Note
Обратите внимание, что handler1 и handler2 подписаны на один exchange с помощью одной и той же очереди:
в рамках одного сервиса это не имеет смысла, так как сообщения будут приходить в эти обработчики поочередно.
Здесь мы эмулируем работу несколько потребителей и балансировку нагрузки между ними.
Распределение сообщений¶
Теперь распределение сообщений между этими потребителями будет выглядеть следующим образом:
await broker.publish(queue="test-q-1", exchange=exch) # handlers: 1
Сообщение 1 будет отправлено в handler1, т.к. он слушает exchange с помощью очереди с ключом маршрутизации test-q-1
await broker.publish(queue="test-q-1", exchange=exch) # handlers: 2
Сообщение 2 будет отправлено в handler2, т.к. он слушает exchange с помощью той же очереди, но handler1 занят
await broker.publish(queue="test-q-1", exchange=exch) # handlers: 1
Сообщение 3 снова будет отправлено в handler1, т.к. он освободился на данный момент
await broker.publish(queue="test-q-2", exchange=exch) # handlers: 3
Сообщение 4 будет отправлено в handler3, т.к. он единственный слушает exchange с помощью очереди с ключом маршрутизации test-q-2