Routing¶
Общее поведение¶
Для объявления функции - обработчика сообщений брокера необходимо использовать декоратор @broker.handle
@broker.handle("test")
async def base_handler(body: str):
...
Это поведение схоже для всех брокеров, однако, параметры, передаваемые в @broker.handle
- являются специфичными для каждого брокера.
Чтобы узнать подробнее о поведении специализированных брокеров перейдите в следующие разделы:
Отложенная регистрация обработчиков¶
Если вы не хотите смешавать "бизнес-логику" вашего приложения с логикой маршрутизации, вы можете воспользоваться стандартным поведением декораторов "без сахара": просто определите функцию-обработчик, а зарегистрируйте ее позже.
async def base_handler(body: str):
...
broker.handle("test")(base_handler)
BrokerRouter¶
Иногда может быть удобно разделить обработчики на группы, которые можно подключить в ваше приложение всего одной командой. Для этого в Propan предусмотрены BrokerRouter: в можете регистрировать ваши обработчики в рамках роутера, а затем подключать этот роутер в ваш брокер.
Это поможет лучше организовать код вашего приложения, а также позволит разделить его на подключаемые модули.
1 2 3 4 5 6 7 8 9 10 11 |
|
1 2 3 4 5 6 7 8 9 10 11 |
|
1 2 3 4 5 6 7 8 9 10 11 |
|
1 2 3 4 5 6 7 8 9 10 11 |
|
1 2 3 4 5 6 7 8 9 10 11 |
|
При этом префикс роутера будет добавлен к названию очереди ваших обработчиков.
@app.after_startup
async def publish_test():
await broker.publish("user-fake-uuid", "user/created")
Обработка ошибок¶
Однако, все брокеры, поддерживающие механизм подтверждения получения сообщений, имеют в методе @broker.handle
флаг retry
, который отвечает за логику обработки ошибок.
По умолчанию этот флаг имеет значение False
, который говорит о том, что если в ходе обработки сообщения возникла ошибка, оно все равно будет извлечено из очереди.
@broker.handle("test", retry=False) # не обрабатывать исключения
async def base_handler(body: str):
...
При установке этого флага в True
сообщение будет помещаться обратно в очередь при возникновении ошибки бесконечно. При этом сообщение может уйти в обработку как другому потребителю (если их несколько), так и тому же самому.
@broker.handle("test", retry=True) # пробовать заново бесконечно
async def base_handler(body: str):
...
При установке в качестве флага значения int
, количество повторных попыток будет ограничено этим числом.
@broker.handle("test", retry=3) # сделать до 3ех попыток
async def base_handler(body: str):
...
Bug
На данный момент учитываются попытки только в рамках текущего потребителя. Если сообщение уйдет другому потребителю, у того будет свой счетчик. Впоследствии эта логика будет переработана.
Tip
Для более сложных вариантов обработки ошибок вы можете использовать tenacity