Перейти к содержанию

Routing

Общее поведение

Для объявления функции - обработчика сообщений брокера необходимо использовать декоратор @broker.handle

@broker.handle("test")
async def base_handler(body: str):
    ...

Это поведение схоже для всех брокеров, однако, параметры, передаваемые в @broker.handle - являются специфичными для каждого брокера.

Чтобы узнать подробнее о поведении специализированных брокеров перейдите в следующие разделы:

Отложенная регистрация обработчиков

Если вы не хотите смешавать "бизнес-логику" вашего приложения с логикой маршрутизации, вы можете воспользоваться стандартным поведением декораторов "без сахара": просто определите функцию-обработчик, а зарегистрируйте ее позже.

async def base_handler(body: str):
    ...

broker.handle("test")(base_handler)

BrokerRouter

Иногда может быть удобно разделить обработчики на группы, которые можно подключить в ваше приложение всего одной командой. Для этого в Propan предусмотрены BrokerRouter: в можете регистрировать ваши обработчики в рамках роутера, а затем подключать этот роутер в ваш брокер.

Это поможет лучше организовать код вашего приложения, а также позволит разделить его на подключаемые модули.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from propan import PropanApp, RedisBroker, RedisRouter

router = RedisRouter(prefix="user/")

@router.handle("created")
async def handle_user_created_event(user_id: str):
    ...

broker = RedisBroker()
broker.include_router(router)
app = PropanApp(broker)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from propan import PropanApp, RabbitBroker, RabbitRouter

router = RabbitRouter(prefix="user/")

@router.handle("created")
async def handle_user_created_event(user_id: str):
    ...

broker = RabbitBroker()
broker.include_router(router)
app = PropanApp(broker)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from propan import PropanApp, KafkaBroker, KafkaRouter

router = KafkaRouter(prefix="user/")

@router.handle("created")
async def handle_user_created_event(user_id: str):
    ...

broker = KafkaBroker()
broker.include_router(router)
app = PropanApp(broker)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from propan import PropanApp, SQSBroker, SQSRouter

router = SQSRouter(prefix="user/")

@router.handle("created")
async def handle_user_created_event(user_id: str):
    ...

broker = SQSBroker()
broker.include_router(router)
app = PropanApp(broker)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from propan import PropanApp, NatsBroker, NatsRouter

router = NatsRouter(prefix="user/")

@router.handle("created")
async def handle_user_created_event(user_id: str):
    ...

broker = NatsBroker()
broker.include_router(router)
app = PropanApp(broker)

При этом префикс роутера будет добавлен к названию очереди ваших обработчиков.

@app.after_startup
async def publish_test():
    await broker.publish("user-fake-uuid", "user/created")

Обработка ошибок

Однако, все брокеры, поддерживающие механизм подтверждения получения сообщений, имеют в методе @broker.handle флаг retry, который отвечает за логику обработки ошибок.

По умолчанию этот флаг имеет значение False, который говорит о том, что если в ходе обработки сообщения возникла ошибка, оно все равно будет извлечено из очереди.

@broker.handle("test", retry=False)  # не обрабатывать исключения
async def base_handler(body: str):
    ...

При установке этого флага в True сообщение будет помещаться обратно в очередь при возникновении ошибки бесконечно. При этом сообщение может уйти в обработку как другому потребителю (если их несколько), так и тому же самому.

@broker.handle("test", retry=True)  # пробовать заново бесконечно
async def base_handler(body: str):
    ...

При установке в качестве флага значения int, количество повторных попыток будет ограничено этим числом.

@broker.handle("test", retry=3)     # сделать до 3ех попыток
async def base_handler(body: str):
    ...

Bug

На данный момент учитываются попытки только в рамках текущего потребителя. Если сообщение уйдет другому потребителю, у того будет свой счетчик. Впоследствии эта логика будет переработана.

Tip

Для более сложных вариантов обработки ошибок вы можете использовать tenacity