Routing¶
Общее поведение¶
Для объявления функции - обработчика сообщений брокера необходимо использовать декоратор @broker.handle
@broker.handle("test")
async def base_handler(body: str):
    ...
Это поведение схоже для всех брокеров, однако, параметры, передаваемые в @broker.handle - являются специфичными для каждого брокера.
Чтобы узнать подробнее о поведении специализированных брокеров перейдите в следующие разделы:
Отложенная регистрация обработчиков¶
Если вы не хотите смешавать "бизнес-логику" вашего приложения с логикой маршрутизации, вы можете воспользоваться стандартным поведением декораторов "без сахара": просто определите функцию-обработчик, а зарегистрируйте ее позже.
async def base_handler(body: str):
    ...
broker.handle("test")(base_handler)
BrokerRouter¶
Иногда может быть удобно разделить обработчики на группы, которые можно подключить в ваше приложение всего одной командой. Для этого в Propan предусмотрены BrokerRouter: в можете регистрировать ваши обработчики в рамках роутера, а затем подключать этот роутер в ваш брокер.
Это поможет лучше организовать код вашего приложения, а также позволит разделить его на подключаемые модули.
1 2 3 4 5 6 7 8 9 10 11  |  | 
1 2 3 4 5 6 7 8 9 10 11  |  | 
1 2 3 4 5 6 7 8 9 10 11  |  | 
1 2 3 4 5 6 7 8 9 10 11  |  | 
1 2 3 4 5 6 7 8 9 10 11  |  | 
При этом префикс роутера будет добавлен к названию очереди ваших обработчиков.
@app.after_startup
async def publish_test():
    await broker.publish("user-fake-uuid", "user/created")
Обработка ошибок¶
Однако, все брокеры, поддерживающие механизм подтверждения получения сообщений, имеют в методе @broker.handle флаг retry, который отвечает за логику обработки ошибок.
По умолчанию этот флаг имеет значение False, который говорит о том, что если в ходе обработки сообщения возникла ошибка, оно все равно будет извлечено из очереди.
@broker.handle("test", retry=False)  # не обрабатывать исключения
async def base_handler(body: str):
    ...
При установке этого флага в True сообщение будет помещаться обратно в очередь при возникновении ошибки бесконечно. При этом сообщение может уйти в обработку как другому потребителю (если их несколько), так и тому же самому.
@broker.handle("test", retry=True)  # пробовать заново бесконечно
async def base_handler(body: str):
    ...
При установке в качестве флага значения int, количество повторных попыток будет ограничено этим числом.
@broker.handle("test", retry=3)     # сделать до 3ех попыток
async def base_handler(body: str):
    ...
Bug
На данный момент учитываются попытки только в рамках текущего потребителя. Если сообщение уйдет другому потребителю, у того будет свой счетчик. Впоследствии эта логика будет переработана.
Tip
Для более сложных вариантов обработки ошибок вы можете использовать tenacity