Skip to content

Direct

Direct Channel is the basic way to route messages in Redis. Its core is very simple: channel sends messages to all consumers subscribed to it.

Scaling

If one channel is listening by several consumers, the message will be received by all consumers of this channel. Thus, horizontal scaling by increasing the number of consumer services is not possible only using Redis Pub/Sub.

If you need similar functionality, look for Redis Streams or other brokers (for example, Nats or RabbitMQ).

Example

Direct Channel is the default type used in Propan: you can simply declare it as follows

@broker.handler("test_channel")
async def handler():
    ...

Full example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from propan import PropanApp, RedisBroker
from propan.annotations import Logger

broker = RedisBroker()
app = PropanApp(broker)


@broker.handle("test")
async def handler1(logger: Logger):
    logger.info("handler1")


@broker.handle("test")
async def handler2(logger: Logger):
    logger.info("handler2")


@broker.handle("test2")
async def handler3(logger: Logger):
    logger.info("handler3")


@app.after_startup
async def publish_smth():
    await broker.publish("", "test")  # handlers: 1, 2
    await broker.publish("", "test2") # handlers: 3

Consumer Announcement

To begin with, we have announced several consumers for the two channels test and test2:

 8
 9
10
11
12
13
14
15
16
17
18
19
20
@broker.handle("test")
async def handler1(logger: Logger):
    logger.info("handler1")


@broker.handle("test")
async def handler2(logger: Logger):
    logger.info("handler2")


@broker.handle("test2")
async def handler3(logger: Logger):
    logger.info("handler3")

Note

Note that handler1 and handler2 are subscribed to the same channel: both of these handlers will receive messages.

Message distribution

Now the distribution of messages between these consumers will look like this:

    await broker.publish("", "test")  # handlers: 1, 2

The message 1 will be sent to handler1 and handler2 because they are listening to channel with the name test


    await broker.publish("", "test2") # handlers: 3

The message 2 will be sent to handler3 because it listens to channel with the name test2