Skip to content

Create Custom Propan Broker

If you want to help me with the development of the project and develop a new PropanBroker for a not yet supported message broker from the plan or you just want to expand the functionality of Propan for internal usage, this instruction can be very helpful to you.

In this section, we will go through the details of the implementation of brokers using examples from Propan.

Parent class

All brokers Propan are inherited from the parent class propan.brokers.model.BrokerAsyncUsecase.

In order to create a broker, it is necessary to inherit from this class and implement all its abstract methods.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from typing import Any, Callable, Optional, TypeVar

from propan.brokers._model import BrokerAsyncUsecase
from propan.brokers._model.schemas import PropanMessage
from propan.brokers.push_back_watcher import BaseWatcher
from propan.types import HandlerWrapper, SendableMessage


T = TypeVar("T")


class MyBroker(BrokerAsyncUsecase):
    async def _connect(self, *args: Any, **kwargs: Any) -> Any:
        pass

    async def close(self) -> None:
        pass

    def handle(self, *args: Any, **kwargs: Any) -> HandlerWrapper:
        pass

    async def start(self) -> None:
        pass

    async def _parse_message(self, message: Any) -> PropanMessage:
        pass

    async def _process_message(
        self,
        func: Callable[[PropanMessage], T],
        watcher: Optional[BaseWatcher],
    ) -> Callable[[PropanMessage], T]:
        pass

    async def publish(
        self,
        message: SendableMessage,
        *args: Any,
        callback: bool = False,
        callback_timeout: Optional[float] = None,
        raise_timeout: bool = False,
        **kwargs: Any,
    ) -> Any:
        pass

Let's tackle each method one by one.

Connecting to a message broker

Two key methods, _connect and close, are responsible for the lifespan of your broker connection. Once these are implemented, the application with your adapter should initialize correctly and establish a connection with the message broker, (but will not process messages just yet).

_connect

The _connect method initializes the connection to your message broker and returns the connection object, which will afterwards be available as self._connection.

Tip

If your broker requires the initialization of additional objects, they should be instantiated within this method as well.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
from typing import Any, Optional


import aio_pika
from propan.brokers._model import BrokerAsyncUsecase

class RabbitBroker(BrokerAsyncUsecase):
    _connection: Optional[aio_pika.RobustConnection]
    _channel: Optional[aio_pika.RobustChannel]

    async def _connect(
        self,
        *args: Any,
        **kwargs: Any,
    ) -> aio_pika.RobustConnection:
        connection = await aio_pika.connect_robust(
            *args, **kwargs, loop=asyncio.get_event_loop()
        )

        if self._channel is None:
            self._channel = await connection.channel()

        return connection

Note

args and kwargs will be passed to your method from either the __init__ or connect methods' arguments. The logic to resolve these arguments is implemented in the parent class, so you don't have to worry about it.

Pay attention to the following lines: here, we initialize the _channel object, which is specific to the RabbitBroker.

 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class RabbitBroker(BrokerAsyncUsecase):
    _connection: Optional[aio_pika.RobustConnection]
    _channel: Optional[aio_pika.RobustChannel]

    async def _connect(
        self,
        *args: Any,
        **kwargs: Any,
    ) -> aio_pika.RobustConnection:
        connection = await aio_pika.connect_robust(
            *args, **kwargs, loop=asyncio.get_event_loop()
        )

        if self._channel is None:
            self._channel = await connection.channel()

        return connection

close

Now, to shut down our broker properly, we implement the close method.

 8
 9
10
11
12
13
14
15
16
17
18
class RabbitBroker(BrokerAsyncUsecase):
    ...

    async def close(self) -> None:
        if self._channel is not None:
            await self._channel.close()
            self._channel = None

        if self._connection is not None:
            await self._connection.close()
            self._connection = None

Note

In the parent's connect method, the _connect method is invoked under the condition self._connection is not None. Therefore, it is important to set self._connection to None after terminating the connection.

Once these methods are implemented, an application with your broker should be able to run successfully.

Register handlers

In order for your broker to start processing messages, it is necessary to implement the handler registration method itself (handle) and the broker launch method (start).

Also, your broker must store information about all registered handlers, so you will need to implement a Handler class specific to each broker.

handle

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from dataclasses import dataclass
from typing import List, Union, Optional

from propan.types import HandlerWrapper, HandlerCallable
from propan.brokers._model import BrokerAsyncUsecase
from propan.brokers._model.schemas import BaseHandler
from propan.brokers.rabbit import RabbitExchange, RabbitQueue


@dataclass
class Handler(BaseHandler):
    queue: RabbitQueue
    exchange: Optional[RabbitExchange] = None


class RabbitBroker(BrokerAsyncUsecase):
    handlers: List[Handler]

    def handle(
        self,
        queue: RabbitQueue,
        exchange: Union[RabbitExchange, None] = None,
        *,
        retry: Union[bool, int] = False,
    ) -> HandlerWrapper:
        def wrapper(func: HandlerCallable) -> HandlerCallable:
            func = self._wrap_handler(func, retry=retry)
            handler = Handler(callback=func, queue=queue, exchange=exchange)
            self.handlers.append(handler)

            return func

        return wrapper

In the highlighted fragments, we store information about registered handlers inside our broker.

Additionally, it's crucial to call the parent method _wrap_handler. This arranges all decorators in the correct order, transforming the original function into a Propan handler.

27
28
29
30
31
            func = self._wrap_handler(func, retry=retry)
            handler = Handler(callback=func, queue=queue, exchange=exchange)
            self.handlers.append(handler)

            return func

start

In the start method, we establish a connection to our message broker and perform all the necessary operations to launch our handlers.

Here is a somewhat simplified code for registering the handlers, however, it demonstrates the concept in full.

1
2
3
4
5
6
7
8
9
class RabbitBroker(BrokerAsyncUsecase):
    ...
    async def start(self) -> None:
        await super().start()

        for handler in self.handlers:
            queue = await self._channel.declare_queue(**handler.queue.dict())
            func = handler.callback
            await queue.consume(func)

There are two possible options here:

  • the library we use to work with the broker supports the callbacks mechanism (like aio-pika does for RabbitMQ)
  • the library supports message iteration only

In the second case, we were less lucky, so we need to convert the loop into a callback. This can be done, for example, using asyncio.Task, as in the Redis example. However, in this case, do not forget to correctly cancel these tasks in the close method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import asyncio
from dataclasses import dataclass
from typing import Any, List, NoReturn, Optional

from redis.asyncio.client import PubSub, Redis

from propan.brokers._model import BrokerAsyncUsecase
from propan.brokers._model.schemas import BaseHandler


@dataclass
class Handler(BaseHandler):
    channel: str
    pattern: bool = False

    task: Optional["asyncio.Task[Any]"] = None
    subscription: Optional[PubSub] = None


class RedisBroker(BrokerAsyncUsecase):
    handlers: List[Handler]
    _connection: Redis

    async def close(self) -> None:
        for h in self.handlers:
            if h.task is not None:
                h.task.cancel()

            if h.subscription is not None:
                await h.subscription.unsubscribe()
                await h.subscription.reset()

        if self._connection is not None:
            await self._connection.close()
            self._connection = None

    async def start(self) -> None:
        await super().start()

        for handler in self.handlers:
            psub = self._connection.pubsub()
            await psub.subscribe(handler.channel)

            handler.subscription = psub
            handler.task = asyncio.create_task(_consume(handler, psub))


async def _consume(handler: Handler, psub: PubSub) -> NoReturn:
    while True:
        m = await psub.get_message(
            ignore_subscribe_messages=True,
            timeout=1.0,
        )
        if m:
            await handler.callback(m)
        await asyncio.sleep(0.01)

After that, your broker should send a received message to the functions decorated with handle. However, these functions will fail with an error.

Processing incoming messages

In order for incoming messages to be processed correctly, two more methods must be implemented: _parse_message and _process_message.

_parse_message

This method converts an incoming message to the Propan message type.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import aio_pika

from propan.brokers._model import BrokerAsyncUsecase
from propan.brokers._model.schemas import PropanMessage


class RabbitBroker(BrokerAsyncUsecase):
    ...
    @staticmethod
    async def _parse_message(
        message: aio_pika.message.IncomingMessage,
    ) -> PropanMessage:
        return PropanMessage(
            body=message.body,
            headers=message.headers,
            reply_to=message.reply_to or "",
            message_id=message.message_id,
            content_type=message.content_type or "",
            raw_message=message,
        )

In this case, only body: bytes and raw_message: Any are required fields. The remaining fields can be obtained both from an incoming message headers and from its body, if the message broker used does not have built-in mechanisms for transmitting the corresponding parameters. It all depends on your implementation of the publish method.

_process_message

Everything is relatively simple here: if the message broker used supports the ack, nack mechanisms, then we should process them here. Also in this place, response publishing should be implemented to support RPC over MQ. If the broker does not support confirmation of message processing, then we simply execute our handler.

Here, for example, is an option with message status processing:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from functools import wraps
from typing import Optional, TypeVar, Callable

from propan.brokers._model import BrokerAsyncUsecase
from propan.brokers._model.schemas import PropanMessage
from propan.brokers.push_back_watcher import BaseWatcher, WatcherContext

T = TypeVar("T")

class RabbitBroker(BrokerAsyncUsecase):
    ...
    def _process_message(
        self, func: Callable[[PropanMessage], T], watcher: Optional[BaseWatcher]
    ) -> Callable[[PropanMessage], T]:
        @wraps(func)
        async def wrapper(message: PropanMessage) -> T:
            pika_message = message.raw_message
            if watcher is None:
                context = pika_message.process()
            else:
                context = WatcherContext(
                    watcher,
                    message,
                    on_success=pika_message.ack,
                    on_error=pika_message.nack,
                    on_max=pika_message.reject,
                )

            async with context:
                r = await func(message)
                if message.reply_to:
                    await self.publish(
                        message=r,
                        routing_key=message.reply_to,
                        correlation_id=pika_message.correlation_id,
                    )

                return r

        return wrapper

And without processing:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from functools import wraps
from typing import Optional, TypeVar, Callable

from propan.brokers._model import BrokerAsyncUsecase
from propan.brokers._model.schemas import PropanMessage
from propan.brokers.push_back_watcher import BaseWatcher

T = TypeVar("T")

class RedisProcess(BrokerAsyncUsecase):
    ...
    def _process_message(
            self,
            func: Callable[[PropanMessage], T],
            watcher: Optional[BaseWatcher],
        ) -> Callable[[PropanMessage], T]:
            @wraps(func)
            async def wrapper(message: PropanMessage) -> T:
                r = await func(message)
                if message.reply_to:
                    await self.publish(r or "", message.reply_to)
                return r

            return wrapper

P.S: the following code is correct too, but without state processing and RPC support.

def _process_message(
    self, func: Callable[[PropanMessage], T], watcher: Optional[BaseWatcher]
) -> Callable[[PropanMessage], T]:
    @wraps(func)
    async def wrapper(message: PropanMessage) -> T:
        return await func(message)

    return wrapper

Publishing messages

The last step we need to implement a sending messages method. This can be either the simplest stage (if we don't want or can't implement RPC right now) or the most complex and creative.

In the example below, I will omit the implementation of RPC, since each broker needs its own implementation. We will just send messages here.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from typing import Optional, Dict, Any

from propan.types import SendableMessage
from propan.brokers._model import BrokerAsyncUsecase
from propan.brokers.redis.schemas import RedisMessage


class RedisProcess(BrokerAsyncUsecase):
    ...
    async def publish(
        self,
        message: SendableMessage = "",
        channel: str = "",
        *,
        reply_to: str = "",
        headers: Optional[Dict[str, Any]] = None,
    ) -> None:
        if self._connection is None:
            raise ValueError("Redis connection not established yet")

        msg, content_type = self._encode_message(message)

        await self._connection.publish(
            channel,
            RedisMessage(
                data=msg,
                headers={
                    "content-type": content_type or "",
                    **(headers or {}),
                },
                reply_to=reply_to,
            ).json(),
        )

Congratulations, after implementing all these methods, you will have a broker capable of correctly sending and receiving messages.

Logging

In order to log incoming messages in a broker specific format, you also need to override several methods.

First you need to reset the standard logging method by overriding the __init__ method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from typing import Any, Optional

from propan.brokers._model import BrokerAsyncUsecase


class RabbitBroker(BrokerAsyncUsecase):
    def __init__(
        self,
        *args: Any,
        log_fmt: Optional[str] = None,
        **kwargs: Any,
    ) -> None:
        super().__init__(*args, log_fmt=log_fmt, **kwargs)

Then, you should define a logging format

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class RabbitBroker(BrokerAsyncUsecase):
    __max_exchange_len: int
    __max_queue_len: int

    def __init__(
        self,
        *args: Any,
        log_fmt: Optional[str] = None,
        **kwargs: Any,
    ) -> None:
        super().__init__(*args, log_fmt=log_fmt, **kwargs)

        self.__max_queue_len = 4
        self.__max_exchange_len = 4

    @property
    def fmt(self) -> str:
        return super().fmt or (
            "%(asctime)s %(levelname)s - "
            f"%(exchange)-{self.__max_exchange_len}s | "
            f"%(queue)-{self.__max_queue_len}s | "
            f"%(message_id)-10s "
            "- %(message)s"
        )

The next step is to implement the _get_log_context method, which will add broker specific fields to log message.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
class RabbitBroker(BrokerAsyncUsecase):
    def _get_log_context(
        self,
        message: Optional[PropanMessage],
        queue: RabbitQueue,
        exchange: Optional[RabbitExchange] = None,
    ) -> Dict[str, Any]:
        return {
            "queue": queue.name,
            "exchange": exchange.name if exchange else "default",
            **super()._get_log_context(message),
        }

This method always takes message as the first argument. You must pass other arguments there by yourself.

Where? - Right in the handle method

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
    ...
    def handle(
        self,
        queue: RabbitQueue,
        exchange: Union[RabbitExchange, None] = None,
        *,
        retry: Union[bool, int] = False,
    ) -> HandlerWrapper:

        def wrapper(func: HandlerCallable) -> HandlerCallable:
            func = self._wrap_handler(
                func,
                queue=queue,
                exchange=exchange,
                retry=retry,
            )
            ....

All custom arguments passed to the _wrap_handler function will be further passed to your _get_log_context method.

Now your broker not only sends and receives messages, but also logs incoming messages in its own format. Congratulations, you are breathtaken!

Success

If you have implemented a broker for your source I am waiting for your PR! I am ready to help you with testing, implementation of specific parts, documentation and everything else. Your work will definitely become a part of Propan.