Skip to content

Pattern

Pattern Subject - мощный механизм маршрутизации NATS. Данный тип subject отправляет сообщения потребителям в соответсвии с паттерном, указанном при их подключении к subject и ключом самого сообщения.

Масштабирование

Если один subject слушает несколько потребителей с одинаковой queue group, сообщение будет уходить каждый раз случайному потребителю.

Таким образом, NATS может самостоятельно балансировать нагрузку на потребителей очереди. Вы можете увеличить скорость обработки потока сообщений из очереди просто запустив дополнительные инстансы сервиса-потребителя. Вам не нужно вносить изменений в текущую конфигурацию инфраструктуры: NATS сам позаботится о том, как распределить сообщения между вашими сервисами.

Пример

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from propan import PropanApp, NatsBroker
from propan.annotations import Logger

broker = NatsBroker()
app = PropanApp(broker)

@broker.handle("*.info", "workers")
async def base_handler1(logger: Logger):
    logger.info("base_handler1")

@broker.handle("*.info", "workers")
async def base_handler2(logger: Logger):
    logger.info("base_handler2")

@broker.handle("*.error", "workers")
async def base_handler3(logger: Logger):
    logger.info("base_handler3")

@app.after_startup
async def send_messages():
    await broker.publish("", "logs.info")  # handlers: 1 or 2
    await broker.publish("", "logs.info")  # handlers: 1 or 2
    await broker.publish("", "logs.error") # handlers: 3

Объявление потребителей

Для начала мы объявили несколько потребителей для двух subject: *.info и *.error:

 7
 8
 9
10
11
12
13
14
15
16
17
@broker.handle("*.info", "workers")
async def base_handler1(logger: Logger):
    logger.info("base_handler1")

@broker.handle("*.info", "workers")
async def base_handler2(logger: Logger):
    logger.info("base_handler2")

@broker.handle("*.error", "workers")
async def base_handler3(logger: Logger):
    logger.info("base_handler3")

При этом в subject наших потребителей мы указываем паттерн, который будут обрабатываться этими потребителемя.

Note

Обратите внимание, что все потребители подписаны с использованием одной queue_group: в рамках одного сервиса это не имеет смысла, так как сообщения будут приходить в эти обработчики поочередно. Здесь мы эмулируем работу несколько потребителей и балансировку нагрузки между ними.

Распределение сообщений

Теперь распределение сообщений между этими потребителями будет выглядеть следующим образом:

    await broker.publish("", "logs.info")  # handlers: 1 or 2

Сообщение 1 будет отправлено в handler1 или handler2, т.к. они слушают один шаблон subject в рамках одной queue group


    await broker.publish("", "logs.info")  # handlers: 1 or 2

Сообщение 2 будет отправлено аналогично сообщению 1


    await broker.publish("", "logs.error") # handlers: 3

Сообщение 3 будет отправлено в handler3, т.к. он единственный слушает шаблон *.error*