Перейти к содержанию

Direct

Direct Channel - базовый способ маршрутизации сообщений в Redis. Его суть очень проста: channel отправляет сообщения всем потребителям, подписанным на него.

Масштабирование

Если один канал слушает несколько потребителей, сообщение будет получено всеми потребителями этого канала. Таким образом, горизонтальное масштабирование путем увеличения количества сервисов-потребителей невозможно только средствами Redis Pub/Sub.

Если вам нужен подобный функционал, посмотрите в сторону Redis Streams или других брокеров (например, Nats или RabbitMQ).

Пример

Direct Channel - тип, используемый в Propan по умолчанию: вы можете просто объявить его следующим образом

@broker.handler("test_channel")
async def handler():
    ...

Полный пример:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from propan import PropanApp, RedisBroker
from propan.annotations import Logger

broker = RedisBroker()
app = PropanApp(broker)


@broker.handle("test")
async def handler1(logger: Logger):
    logger.info("handler1")


@broker.handle("test")
async def handler2(logger: Logger):
    logger.info("handler2")


@broker.handle("test2")
async def handler3(logger: Logger):
    logger.info("handler3")


@app.after_startup
async def publish_smth():
    await broker.publish("", "test")  # handlers: 1, 2
    await broker.publish("", "test2") # handlers: 3

Объявление потребителей

Для начала мы объявили несколько потребителей для двух каналов test и test2:

 8
 9
10
11
12
13
14
15
16
17
18
19
20
@broker.handle("test")
async def handler1(logger: Logger):
    logger.info("handler1")


@broker.handle("test")
async def handler2(logger: Logger):
    logger.info("handler2")


@broker.handle("test2")
async def handler3(logger: Logger):
    logger.info("handler3")

Note

Обратите внимание, что handler1 и handler2 подписаны на один channel: cообщения будут приходить оба этих обработчика.

Распределение сообщений

Теперь распределение сообщений между этими потребителями будет выглядеть следующим образом:

    await broker.publish("", "test")  # handlers: 1, 2

Сообщение 1 будет отправлено в handler1 и handler2, т.к. они слушают channel с названием test


    await broker.publish("", "test2") # handlers: 3

Сообщение 2 будет отправлено в handler3, т.к. он слушает channel с названием test2