Skip to content

Pattern

Pattern Channel - мощный механизм маршрутизации Redis. Данный тип channel отправляет сообщения потребителям в соответсвии с паттерном, указанном при их подключении к channel и ключом самого сообщения.

Масштабирование

Если ключ сообщение совпадает с паттерном нескольких потребителей, оно будет отправлено всем им. Таким образом, горизонтальное масштабирование путем увеличения количества сервисов-потребителей невозможно только средствами Redis Pub/Sub.

Если вам нужен подобный функционал, посмотрите в сторону Redis Streams или других брокеров (например, Nats или RabbitMQ).

Пример

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from propan import PropanApp, RedisBroker
from propan.annotations import Logger

broker = RedisBroker()
app = PropanApp(broker)


@broker.handle("*.info", pattern=True)
async def handler1(b: str, logger: Logger):
    logger.info("handler1")


@broker.handle("*.info", pattern=True)
async def handler2(b: str, logger: Logger):
    logger.info("handler2")


@broker.handle("*.error", pattern=True)
async def handler3(logger: Logger):
    logger.info("handler3")


@app.after_startup
async def publish_smth():
    await broker.publish("", "logs.info")  # handlers: 1, 2
    await broker.publish("", "logs.error") # handlers: 3

Объявление потребителей

Для начала мы объявили несколько потребителей для двух каналов *.info* и *.error:

 8
 9
10
11
12
13
14
15
16
17
18
19
20
@broker.handle("*.info", pattern=True)
async def handler1(b: str, logger: Logger):
    logger.info("handler1")


@broker.handle("*.info", pattern=True)
async def handler2(b: str, logger: Logger):
    logger.info("handler2")


@broker.handle("*.error", pattern=True)
async def handler3(logger: Logger):
    logger.info("handler3")

Note

Обратите внимание, что handler1 и handler2 подписаны на один channel: cообщения будут приходить оба этих обработчика.

Распределение сообщений

Теперь распределение сообщений между этими потребителями будет выглядеть следующим образом:

    await broker.publish("", "logs.info")  # handlers: 1, 2

Сообщение 1 будет отправлено в handler1 и handler2, т.к. они слушают channel с паттерном *.info*


    await broker.publish("", "logs.error") # handlers: 3

Сообщение 2 будет отправлено в handler3, т.к. он слушает channel с паттерном *.error