Skip to content

Create Custom Propan Broker

Если вы хотите помочь мне с развитием проекта и разработать PropanBroker для еще не поддерживаемого брокера сообщений из плана или вы просто хотите расширить функционал Propan для внутреннего использования, вам пригодится эта инструкция по созданию собственного PropanBroker.

В данном разделе мы разберемся с деталями реализации брокеров на примерах уже существующих в Propan.

Родительский класс

Все брокеры Propan наследуются от родительского класса propan.brokers.model.BrokerAsyncUsecase.

Для того, чтобы создания полноценного брокера необходимо отнаследоваться от этого класса и реализовать все его абстрактные методы.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from typing import Any, Callable, Optional, TypeVar

from propan.brokers._model import BrokerAsyncUsecase
from propan.brokers._model.schemas import PropanMessage
from propan.brokers.push_back_watcher import BaseWatcher
from propan.types import HandlerWrapper, SendableMessage


T = TypeVar("T")


class MyBroker(BrokerAsyncUsecase):
    async def _connect(self, *args: Any, **kwargs: Any) -> Any:
        pass

    async def close(self) -> None:
        pass

    def handle(self, *args: Any, **kwargs: Any) -> HandlerWrapper:
        pass

    async def start(self) -> None:
        pass

    async def _parse_message(self, message: Any) -> PropanMessage:
        pass

    async def _process_message(
        self,
        func: Callable[[PropanMessage], T],
        watcher: Optional[BaseWatcher],
    ) -> Callable[[PropanMessage], T]:
        pass

    async def publish(
        self,
        message: SendableMessage,
        *args: Any,
        callback: bool = False,
        callback_timeout: Optional[float] = None,
        raise_timeout: bool = False,
        **kwargs: Any,
    ) -> Any:
        pass

Разберемся со всем по порядку.

Подключение к брокеру сообщений

За жизненный цикл вашего брокера отвечают два ключевых метода: _connect и close. После их реализации приложение с вашим адаптером уже должно корректно запуститься и подключиться к брокеру сообщений (но еще не обрабатывать сообщения).

_connect

Метод _connect инициализирует подключение к вашему брокеру сообщений и возвращает объект этого подключения, который позже будет доступен как self._connection.

Tip

Если ваш брокер требует инициализации дополнительных объектов вы также должны инициализировать их в этом методе.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
from typing import Any, Optional


import aio_pika
from propan.brokers._model import BrokerAsyncUsecase

class RabbitBroker(BrokerAsyncUsecase):
    _connection: Optional[aio_pika.RobustConnection]
    _channel: Optional[aio_pika.RobustChannel]

    async def _connect(
        self,
        *args: Any,
        **kwargs: Any,
    ) -> aio_pika.RobustConnection:
        connection = await aio_pika.connect_robust(
            *args, **kwargs, loop=asyncio.get_event_loop()
        )

        if self._channel is None:
            self._channel = await connection.channel()

        return connection

Note

args и kwargs передадутся в ваш метод либо из параметров __init__, либо из параметров метода connect. Логика разрешения этих аргументов реализована в родительском классе, вам не нужно об этом волноваться.

Обратите внимание на следующие строки: в них мы инициализируем специфичный для RabbitBroker объект _channel.

 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class RabbitBroker(BrokerAsyncUsecase):
    _connection: Optional[aio_pika.RobustConnection]
    _channel: Optional[aio_pika.RobustChannel]

    async def _connect(
        self,
        *args: Any,
        **kwargs: Any,
    ) -> aio_pika.RobustConnection:
        connection = await aio_pika.connect_robust(
            *args, **kwargs, loop=asyncio.get_event_loop()
        )

        if self._channel is None:
            self._channel = await connection.channel()

        return connection

close

Теперь нам необходимо корректно завершить работу нашего брокера. Для этого реализуем метод close.

 8
 9
10
11
12
13
14
15
16
17
18
class RabbitBroker(BrokerAsyncUsecase):
    ...

    async def close(self) -> None:
        if self._channel is not None:
            await self._channel.close()
            self._channel = None

        if self._connection is not None:
            await self._connection.close()
            self._connection = None

Note

В родительском методе connect реализация метода _connect вызывается при условии self._connection is not None, поэтому важно после остановки соединения также его и обнулить.

После реализации этих методов приложение с вашим брокером уже должно запускаться.

Регистрация обработчиков

Для того, чтобы ваш брокер начал обрабатывать сообщения необходимо реализовать непосредственно сам метод регистрации обработчика (handle) и метод запуска брокера (start).

Также ваш брокер должен хранить информацию обо всех зарегистрированных обработчиках, поэтому вам будет необходимо реализовать класс Handler, специфичный для каждого брокера.

handle

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from dataclasses import dataclass
from typing import List, Union, Optional

from propan.types import HandlerWrapper, HandlerCallable
from propan.brokers._model import BrokerAsyncUsecase
from propan.brokers._model.schemas import BaseHandler
from propan.brokers.rabbit import RabbitExchange, RabbitQueue


@dataclass
class Handler(BaseHandler):
    queue: RabbitQueue
    exchange: Optional[RabbitExchange] = None


class RabbitBroker(BrokerAsyncUsecase):
    handlers: List[Handler]

    def handle(
        self,
        queue: RabbitQueue,
        exchange: Union[RabbitExchange, None] = None,
        *,
        retry: Union[bool, int] = False,
    ) -> HandlerWrapper:
        def wrapper(func: HandlerCallable) -> HandlerCallable:
            func = self._wrap_handler(func, retry=retry)
            handler = Handler(callback=func, queue=queue, exchange=exchange)
            self.handlers.append(handler)

            return func

        return wrapper

В выделенных фрагментах мы сохраняем информацию о зарегистрированных обработчиках внутри нашего брокера.

Также, очень важным моментом является вызова родительского метода _wrap_handler - именно этот метод устанавливает в необъодимом порядке все декораторы, превращающие обычную функцию в обработчик Propan.

27
28
29
30
31
            func = self._wrap_handler(func, retry=retry)
            handler = Handler(callback=func, queue=queue, exchange=exchange)
            self.handlers.append(handler)

            return func

start

В мотоде start мы устанавливает подключение к нашему брокеру сообщений и производим все необходимые операции для запуска наших обработчиков.

Здесь представлен несколько упрощенный код регистрации handler'ов, однако, концепцию он демонстрирует в полной мере.

1
2
3
4
5
6
7
8
9
class RabbitBroker(BrokerAsyncUsecase):
    ...
    async def start(self) -> None:
        await super().start()

        for handler in self.handlers:
            queue = await self._channel.declare_queue(**handler.queue.dict())
            func = handler.callback
            await queue.consume(func)

Здесь возможны два варианта:

  • библиотека, которую мы используем для работы с брокером поддерживает механизм callbacks (как aio-pika, используемая для RabbitMQ)
  • библиотека поддерживает только итерирование по сообщениям

Во втором случае нам повезло меньше и нам нужно превратить цикл в callback. Этого можно допиться, например, используя asyncio.Task, как в примере с Redis. Однако, в таком случае, нужно не забыть корректным образом завершить эти задачи в методе close.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import asyncio
from dataclasses import dataclass
from typing import Any, List, NoReturn, Optional

from redis.asyncio.client import PubSub, Redis

from propan.brokers._model import BrokerAsyncUsecase
from propan.brokers._model.schemas import BaseHandler


@dataclass
class Handler(BaseHandler):
    channel: str
    pattern: bool = False

    task: Optional["asyncio.Task[Any]"] = None
    subscription: Optional[PubSub] = None


class RedisBroker(BrokerAsyncUsecase):
    handlers: List[Handler]
    _connection: Redis

    async def close(self) -> None:
        for h in self.handlers:
            if h.task is not None:
                h.task.cancel()

            if h.subscription is not None:
                await h.subscription.unsubscribe()
                await h.subscription.reset()

        if self._connection is not None:
            await self._connection.close()
            self._connection = None

    async def start(self) -> None:
        await super().start()

        for handler in self.handlers:
            psub = self._connection.pubsub()
            await psub.subscribe(handler.channel)

            handler.subscription = psub
            handler.task = asyncio.create_task(_consume(handler, psub))


async def _consume(handler: Handler, psub: PubSub) -> NoReturn:
    while True:
        m = await psub.get_message(
            ignore_subscribe_messages=True,
            timeout=1.0,
        )
        if m:
            await handler.callback(m)
        await asyncio.sleep(0.01)

После этого ваш брокер уже должен отправлять получаемые сообщения в функции, декорированные с помощью handle. Однако, пока эти функции будут падать с ошибкой.

Обработка входящих сообщений

Для того, чтобы обработка входящих сообщений завершалась корректным образом, необходимо реализовать еще два метода: _parse_message и _process_message.

_parse_message

Этот метод отвечает за приведение входящего сообщения к типу сообщений Propan.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import aio_pika

from propan.brokers._model import BrokerAsyncUsecase
from propan.brokers._model.schemas import PropanMessage


class RabbitBroker(BrokerAsyncUsecase):
    ...
    @staticmethod
    async def _parse_message(
        message: aio_pika.message.IncomingMessage,
    ) -> PropanMessage:
        return PropanMessage(
            body=message.body,
            headers=message.headers,
            reply_to=message.reply_to or "",
            message_id=message.message_id,
            content_type=message.content_type or "",
            raw_message=message,
        )

При этом обязательными полями являются только body: bytes и raw_message: Any. Остальные поля могут быть получены как из заголовков входящего сообщения, так и из его тела, если используемый брокер сообщений не имеет встроенных механизмов для передачи соответствующих параметров. Все зависит от вашей реализации метода publish.

_process_message

Здесь все относительно просто: если используемый брокер сообщений поддерживает механизмы ack, nack, то мы должны обрабатывать их здесь. Также в этом месте просходит формирование ответного сообщения и его отправка для поддержки RPC over MQ. Если брокер не поддерживает подтверждение обработки сообщения, то мы просто выполняем наш handler.

Вот, например, вариант с обработкой состояния сообщения:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from functools import wraps
from typing import Optional, TypeVar, Callable

from propan.brokers._model import BrokerAsyncUsecase
from propan.brokers._model.schemas import PropanMessage
from propan.brokers.push_back_watcher import BaseWatcher, WatcherContext

T = TypeVar("T")

class RabbitBroker(BrokerAsyncUsecase):
    ...
    def _process_message(
        self, func: Callable[[PropanMessage], T], watcher: Optional[BaseWatcher]
    ) -> Callable[[PropanMessage], T]:
        @wraps(func)
        async def wrapper(message: PropanMessage) -> T:
            pika_message = message.raw_message
            if watcher is None:
                context = pika_message.process()
            else:
                context = WatcherContext(
                    watcher,
                    message,
                    on_success=pika_message.ack,
                    on_error=pika_message.nack,
                    on_max=pika_message.reject,
                )

            async with context:
                r = await func(message)
                if message.reply_to:
                    await self.publish(
                        message=r,
                        routing_key=message.reply_to,
                        correlation_id=pika_message.correlation_id,
                    )

                return r

        return wrapper

А вот - без обработки:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from functools import wraps
from typing import Optional, TypeVar, Callable

from propan.brokers._model import BrokerAsyncUsecase
from propan.brokers._model.schemas import PropanMessage
from propan.brokers.push_back_watcher import BaseWatcher

T = TypeVar("T")

class RedisProcess(BrokerAsyncUsecase):
    ...
    def _process_message(
            self,
            func: Callable[[PropanMessage], T],
            watcher: Optional[BaseWatcher],
        ) -> Callable[[PropanMessage], T]:
            @wraps(func)
            async def wrapper(message: PropanMessage) -> T:
                r = await func(message)
                if message.reply_to:
                    await self.publish(r or "", message.reply_to)
                return r

            return wrapper

P.S: вот так уже будет работать, но без обработки состояния и RPC

def _process_message(
    self, func: Callable[[PropanMessage], T], watcher: Optional[BaseWatcher]
) -> Callable[[PropanMessage], T]:
    @wraps(func)
    async def wrapper(message: PropanMessage) -> T:
        return await func(message)

    return wrapper

Публикация сообщений

Последним шагом нам необходимо реализовать метод отправки сообщения. Это может быть как самый простой этап (если мы не хотим или не можем реализовать RPC сейчас), так и самым сложным и творческим.

В примере ниже я опущу реализацию RPC, так как для каждого брокера необходима своя отдельная реализация. Здесь мы будем просто отправлять сообщения.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from typing import Optional, Dict, Any

from propan.types import SendableMessage
from propan.brokers._model import BrokerAsyncUsecase
from propan.brokers.redis.schemas import RedisMessage


class RedisProcess(BrokerAsyncUsecase):
    ...
    async def publish(
        self,
        message: SendableMessage = "",
        channel: str = "",
        *,
        reply_to: str = "",
        headers: Optional[Dict[str, Any]] = None,
    ) -> None:
        if self._connection is None:
            raise ValueError("Redis connection not established yet")

        msg, content_type = self._encode_message(message)

        await self._connection.publish(
            channel,
            RedisMessage(
                data=msg,
                headers={
                    "content-type": content_type or "",
                    **(headers or {}),
                },
                reply_to=reply_to,
            ).json(),
        )

Поздравляю, после реализации всех этих методов у вас уже будет брокер, способный корректно отправлять и принимать сообщения.

Логирование

Для того, чтобы ваш брокер логировал входящие сообщения в формате, специфичном для него, необходимо также переопределить несколько методов.

Для начала нужно сбросить стандартный способ логирования, переопределив метод __init__.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from typing import Any, Optional

from propan.brokers._model import BrokerAsyncUsecase


class RabbitBroker(BrokerAsyncUsecase):
    def __init__(
        self,
        *args: Any,
        log_fmt: Optional[str] = None,
        **kwargs: Any,
    ) -> None:
        super().__init__(*args, log_fmt=log_fmt, **kwargs)

Затем, вам нужно определить формат логирования

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class RabbitBroker(BrokerAsyncUsecase):
    __max_exchange_len: int
    __max_queue_len: int

    def __init__(
        self,
        *args: Any,
        log_fmt: Optional[str] = None,
        **kwargs: Any,
    ) -> None:
        super().__init__(*args, log_fmt=log_fmt, **kwargs)

        self.__max_queue_len = 4
        self.__max_exchange_len = 4

    @property
    def fmt(self) -> str:
        return super().fmt or (
            "%(asctime)s %(levelname)s - "
            f"%(exchange)-{self.__max_exchange_len}s | "
            f"%(queue)-{self.__max_queue_len}s | "
            f"%(message_id)-10s "
            "- %(message)s"
        )

Следующим шагом вам нужно реализовать метод _get_log_context, который будет добавлять в сообщение поля, специфичные для вашего брокера.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
class RabbitBroker(BrokerAsyncUsecase):
    def _get_log_context(
        self,
        message: Optional[PropanMessage],
        queue: RabbitQueue,
        exchange: Optional[RabbitExchange] = None,
    ) -> Dict[str, Any]:
        return {
            "queue": queue.name,
            "exchange": exchange.name if exchange else "default",
            **super()._get_log_context(message),
        }

Данный метод всегда принимает первым аргументом message. Остальные аргументы вы должны передать туда сами.

Где? - Прямо в методе handle

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
    ...
    def handle(
        self,
        queue: RabbitQueue,
        exchange: Union[RabbitExchange, None] = None,
        *,
        retry: Union[bool, int] = False,
    ) -> HandlerWrapper:

        def wrapper(func: HandlerCallable) -> HandlerCallable:
            func = self._wrap_handler(
                func,
                queue=queue,
                exchange=exchange,
                retry=retry,
            )
            ....

Все аргументы кастомные, переданные в функцию _wrap_handler, будут в дальнейшем переданы в вашу функцию _get_log_context.

Теперь ваш брокер не только отправляет и принимает сообщения, но и логирует входящие сообщения в собственном формате. Поздравляю, вы - великолепны!

Если вы реализовали подобный брокер для собственного источника, я очень жду ваш PR! Я готов помочь вам с тестированием, реализацией отдельных частей, документацией и всем остальным. Ваш труд обязательно станет частью Propan.