Skip to content

Pattern

Pattern Subject is a powerful NATS routing engine. This type of subject messages to consumers by the pattern specified when they connect to subject and a message key.

Scaling

If one subject is listening by several consumers with the same queue group, the message will go to a random consumer each time.

Thus, NATS can independently balance the load on queue consumers. You can increase the processing speed of the message flow from the queue by simply launching additional instances of the consumer service. You don't need to make changes to the current infrastructure configuration: NATS will take care of how to distribute messages between your services.

Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from propan import PropanApp, NatsBroker
from propan.annotations import Logger

broker = NatsBroker()
app = PropanApp(broker)

@broker.handle("*.info", "workers")
async def base_handler1(logger: Logger):
    logger.info("base_handler1")

@broker.handle("*.info", "workers")
async def base_handler2(logger: Logger):
    logger.info("base_handler2")

@broker.handle("*.error", "workers")
async def base_handler3(logger: Logger):
    logger.info("base_handler3")

@app.after_startup
async def send_messages():
    await broker.publish("", "logs.info")  # handlers: 1 or 2
    await broker.publish("", "logs.info")  # handlers: 1 or 2
    await broker.publish("", "logs.error") # handlers: 3

Consumer Announcement

To begin with, we have announced several consumers for two subjects: *.info and *.error:

 7
 8
 9
10
11
12
13
14
15
16
17
@broker.handle("*.info", "workers")
async def base_handler1(logger: Logger):
    logger.info("base_handler1")

@broker.handle("*.info", "workers")
async def base_handler2(logger: Logger):
    logger.info("base_handler2")

@broker.handle("*.error", "workers")
async def base_handler3(logger: Logger):
    logger.info("base_handler3")

At the same time, in the subject of our consumers, we specify the pattern that will be processed by these consumers.

Note

Note that all consumers are subscribed using the same queue_group: within the same service, this does not make sense, since messages will come to these handlers in turn. Here we emulate the work of several consumers and load balancing between them.

Message distribution

Now the distribution of messages between these consumers will look like this:

    await broker.publish("", "logs.info")  # handlers: 1 or 2

The message 1 will be sent to handler1 or handler2, because they listen to the same subject template within the same queue group


    await broker.publish("", "logs.info")  # handlers: 1 or 2

Message 2 will be sent similarly to message 1


    await broker.publish("", "logs.error") # handlers: 3

The message 3 will be sent to handler3, because it is the only one listening to the pattern *.error*