Skip to content

Routing

General behavior

To declare a broker message handler function, use the decorator @broker.handle:

@broker.handle("test")
async def base_handler(body: str):
    ...

This behavior is the same for all brokers, but the parameters passed to @broker.handle are specific for each broker.

To learn more about the behavior of specialized brokers, go to the following sections:

Delayed handlers registration

If you don't want to mix an application "business logic" with the routing one, you can use the standard Python decorators behavior "without sugar": just define a handler function, and register it later.

async def base_handler(body: str):
    ...

broker.handle("test")(base_handler)

BrokerRouter

Sometimes it can be convenient to divide handlers into groups that can be connected to your application with just one command. To do this, Propan provides BrokerRouter: you can register your handlers within the router, and then connect this router to your broker.

This will help to better organize your application's code and also allow you to divide it into plug-ins.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from propan import PropanApp, RedisBroker, RedisRouter

router = RedisRouter(prefix="user/")

@router.handle("created")
async def handle_user_created_event(user_id: str):
    ...

broker = RedisBroker()
broker.include_router(router)
app = PropanApp(broker)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from propan import PropanApp, RabbitBroker, RabbitRouter

router = RabbitRouter(prefix="user/")

@router.handle("created")
async def handle_user_created_event(user_id: str):
    ...

broker = RabbitBroker()
broker.include_router(router)
app = PropanApp(broker)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from propan import PropanApp, KafkaBroker, KafkaRouter

router = KafkaRouter(prefix="user/")

@router.handle("created")
async def handle_user_created_event(user_id: str):
    ...

broker = KafkaBroker()
broker.include_router(router)
app = PropanApp(broker)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from propan import PropanApp, SQSBroker, SQSRouter

router = SQSRouter(prefix="user/")

@router.handle("created")
async def handle_user_created_event(user_id: str):
    ...

broker = SQSBroker()
broker.include_router(router)
app = PropanApp(broker)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from propan import PropanApp, NatsBroker, NatsRouter

router = NatsRouter(prefix="user/")

@router.handle("created")
async def handle_user_created_event(user_id: str):
    ...

broker = NatsBroker()
broker.include_router(router)
app = PropanApp(broker)

In this case, the router prefix will be added to the name of the queue of your handlers.

@app.after_startup
async def publish_test():
    await broker.publish("user-fake-uuid", "user/created")

Error handling

However, all brokers that support acknowledgement have the retry flag in the @broker.handle method, which is responsible for error handling logic.

By default, this flag has the value False, which indicates that if an error has occurred during message processing, it can still be retrieved from the queue:

@broker.handle("test", retry=False) # don't handle exceptions
async def base_handler(body: str):
    ...

If this flag is set to True, the message will be placed back in the queue every time an error occurs. In this case, the message can be processed both by another consumer (if there are several of them) and by the same one:

@broker.handle("test", retry=True)  # try again indefinitely
async def base_handler(body: str):
    ...

If the retry flag is set to int, the message will be placed back in the queue and the number of retries will be limited to this number:

@broker.handle("test", retry=3)     # make up to 3 attempts
async def base_handler(body: str):
    ...

Bug

At the moment, attempts are taken into account only by the current consumer. If the message goes to another consumer, he will have his own counter. Subsequently, this logic will be reworked.

Tip

At more complex error handling cases you can use tenacity