Skip to content

Pattern

Pattern Channel is a powerful Redis routing engine. This type of channel sends messages to consumers by the pattern specified when they connect to the channel and a message key.

Scaling

If the message key matches the pattern of several consumers, it will be sent to all them. Thus, horizontal scaling by increasing the number of consumer services is not possible only using Redis Pub/Sub.

If you need similar functionality, look towards Redis Streams or other brokers (for example, Nats or RabbitMQ).

Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from propan import PropanApp, RedisBroker
from propan.annotations import Logger

broker = RedisBroker()
app = PropanApp(broker)


@broker.handle("*.info", pattern=True)
async def handler1(b: str, logger: Logger):
    logger.info("handler1")


@broker.handle("*.info", pattern=True)
async def handler2(b: str, logger: Logger):
    logger.info("handler2")


@broker.handle("*.error", pattern=True)
async def handler3(logger: Logger):
    logger.info("handler3")


@app.after_startup
async def publish_smth():
    await broker.publish("", "logs.info")  # handlers: 1, 2
    await broker.publish("", "logs.error") # handlers: 3

Consumer Announcement

To begin with, we have announced several consumers for two channels *.info* and *.error:

 8
 9
10
11
12
13
14
15
16
17
18
19
20
@broker.handle("*.info", pattern=True)
async def handler1(b: str, logger: Logger):
    logger.info("handler1")


@broker.handle("*.info", pattern=True)
async def handler2(b: str, logger: Logger):
    logger.info("handler2")


@broker.handle("*.error", pattern=True)
async def handler3(logger: Logger):
    logger.info("handler3")

Note

Note that handler1 and handler2 are subscribed to the same channel: both of these handlers will receive messages.

Message distribution

Now the distribution of messages between these consumers will look like this:

    await broker.publish("", "logs.info")  # handlers: 1, 2

The message 1 will be sent to handler1 and handler2 because they are listening to channel with the pattern *.info*


    await broker.publish("", "logs.error") # handlers: 3

The message 2 will be sent to handler3 because it listens to channel with the pattern *.error