Если вы хотите помочь мне с развитием проекта и разработать PropanBroker для еще не поддерживаемого брокера сообщений из плана
или вы просто хотите расширить функционал Propan для внутреннего использования, вам пригодится эта инструкция по созданию собственного PropanBroker.
В данном разделе мы разберемся с деталями реализации брокеров на примерах уже существующих в Propan.
За жизненный цикл вашего брокера отвечают два ключевых метода: _connect и close. После их реализации приложение с вашим адаптером
уже должно корректно запуститься и подключиться к брокеру сообщений (но еще не обрабатывать сообщения).
Метод _connect инициализирует подключение к вашему брокеру сообщений и возвращает объект этого подключения, который позже будет доступен как self._connection.
Tip
Если ваш брокер требует инициализации дополнительных объектов вы также должны инициализировать их в этом методе.
args и kwargs передадутся в ваш метод либо из параметров __init__, либо из параметров метода connect. Логика разрешения этих аргументов реализована в родительском классе, вам не нужно об этом волноваться.
Обратите внимание на следующие строки: в них мы инициализируем специфичный для RabbitBroker объект _channel.
В родительском методе connect реализация метода _connect вызывается при условии self._connection is not None, поэтому важно после остановки соединения также его и обнулить.
После реализации этих методов приложение с вашим брокером уже должно запускаться.
Для того, чтобы ваш брокер начал обрабатывать сообщения необходимо реализовать непосредственно сам метод регистрации обработчика (handle) и метод запуска брокера (start).
Также ваш брокер должен хранить информацию обо всех зарегистрированных обработчиках, поэтому вам будет необходимо реализовать класс Handler, специфичный для каждого брокера.
В выделенных фрагментах мы сохраняем информацию о зарегистрированных обработчиках внутри нашего брокера.
Также, очень важным моментом является вызова родительского метода _wrap_handler - именно этот метод устанавливает в необъодимом порядке все декораторы, превращающие обычную функцию в обработчик Propan.
библиотека, которую мы используем для работы с брокером поддерживает механизм callbacks (как aio-pika, используемая для RabbitMQ)
библиотека поддерживает только итерирование по сообщениям
Во втором случае нам повезло меньше и нам нужно превратить цикл в callback. Этого можно допиться, например, используя asyncio.Task, как в примере с Redis. Однако, в таком случае, нужно не забыть корректным образом завершить эти задачи в методе close.
После этого ваш брокер уже должен отправлять получаемые сообщения в функции, декорированные с помощью handle. Однако, пока эти функции будут падать с ошибкой.
При этом обязательными полями являются только body: bytes и raw_message: Any. Остальные поля могут быть получены как из заголовков входящего сообщения, так и из его тела, если используемый брокер сообщений не имеет встроенных механизмов для передачи соответствующих параметров. Все зависит от вашей реализации метода publish.
Здесь все относительно просто: если используемый брокер сообщений поддерживает механизмы ack, nack, то мы должны обрабатывать их здесь. Также в этом месте просходит формирование ответного сообщения и его отправка для поддержки RPC over MQ. Если брокер не поддерживает подтверждение обработки сообщения, то мы просто выполняем наш handler.
Вот, например, вариант с обработкой состояния сообщения:
Последним шагом нам необходимо реализовать метод отправки сообщения. Это может быть как самый простой этап (если мы не хотим или не можем реализовать RPC сейчас), так и самым сложным и творческим.
В примере ниже я опущу реализацию RPC, так как для каждого брокера необходима своя отдельная реализация. Здесь мы будем просто отправлять сообщения.
fromtypingimportOptional,Dict,Anyfrompropan.typesimportSendableMessagefrompropan.brokers._modelimportBrokerAsyncUsecasefrompropan.brokers.redis.schemasimportRedisMessageclassRedisProcess(BrokerAsyncUsecase):...asyncdefpublish(self,message:SendableMessage="",channel:str="",*,reply_to:str="",headers:Optional[Dict[str,Any]]=None,)->None:ifself._connectionisNone:raiseValueError("Redis connection not established yet")msg,content_type=self._encode_message(message)awaitself._connection.publish(channel,RedisMessage(data=msg,headers={"content-type":content_typeor"",**(headersor{}),},reply_to=reply_to,).json(),)
Поздравляю, после реализации всех этих методов у вас уже будет брокер, способный корректно отправлять и принимать сообщения.
Все аргументы кастомные, переданные в функцию _wrap_handler, будут в дальнейшем переданы в вашу функцию _get_log_context.
Теперь ваш брокер не только отправляет и принимает сообщения, но и логирует входящие сообщения в собственном формате. Поздравляю, вы - великолепны!
Если вы реализовали подобный брокер для собственного источника, я очень жду ваш PR! Я готов помочь вам с тестированием, реализацией отдельных частей, документацией и всем остальным. Ваш труд обязательно станет частью Propan.