If you want to help me with the development of the project and develop a new PropanBroker for a not yet supported message broker from the plan or you just want to expand the functionality of Propan for internal usage, this instruction can be very helpful to you.
In this section, we will go through the details of the implementation of brokers using examples from Propan.
Two key methods, _connect and close, are responsible for the lifespan of your broker connection. Once these are implemented, the application with your adapter should initialize correctly and establish a connection with the message broker, (but will not process messages just yet).
The _connect method initializes the connection to your message broker and returns the connection object, which will afterwards be available as self._connection.
Tip
If your broker requires the initialization of additional objects, they should be instantiated within this method as well.
args and kwargs will be passed to your method from either the __init__ or connect methods' arguments. The logic to resolve these arguments is implemented in the parent class, so you don't have to worry about it.
Pay attention to the following lines: here, we initialize the _channel object, which is specific to the RabbitBroker.
In the parent's connect method, the _connect method is invoked under the condition self._connection is not None. Therefore, it is important to set self._connection to None after terminating the connection.
Once these methods are implemented, an application with your broker should be able to run successfully.
In order for your broker to start processing messages, it is necessary to implement the handler registration method itself (handle) and the broker launch method (start).
Also, your broker must store information about all registered handlers, so you will need to implement a Handler class specific to each broker.
In the highlighted fragments, we store information about registered handlers inside our broker.
Additionally, it's crucial to call the parent method _wrap_handler. This arranges all decorators in the correct order, transforming the original function into a Propan handler.
the library we use to work with the broker supports the callbacks mechanism (like aio-pika does for RabbitMQ)
the library supports message iteration only
In the second case, we were less lucky, so we need to convert the loop into a callback. This can be done, for example, using asyncio.Task, as in the Redis example. However, in this case, do not forget to correctly cancel these tasks in the close method.
In this case, only body: bytes and raw_message: Any are required fields. The remaining fields can be obtained both from an incoming message headers and from its body, if the message broker used does not have built-in mechanisms for transmitting the corresponding parameters. It all depends on your implementation of the publish method.
Everything is relatively simple here: if the message broker used supports the ack, nack mechanisms, then we should process them here. Also in this place, response publishing should be implemented to support RPC over MQ. If the broker does not support confirmation of message processing, then we simply execute our handler.
Here, for example, is an option with message status processing:
The last step we need to implement a sending messages method. This can be either the simplest stage (if we don't want or can't implement RPC right now) or the most complex and creative.
In the example below, I will omit the implementation of RPC, since each broker needs its own implementation. We will just send messages here.
fromtypingimportOptional,Dict,Anyfrompropan.typesimportSendableMessagefrompropan.brokers._modelimportBrokerAsyncUsecasefrompropan.brokers.redis.schemasimportRedisMessageclassRedisProcess(BrokerAsyncUsecase):...asyncdefpublish(self,message:SendableMessage="",channel:str="",*,reply_to:str="",headers:Optional[Dict[str,Any]]=None,)->None:ifself._connectionisNone:raiseValueError("Redis connection not established yet")msg,content_type=self._encode_message(message)awaitself._connection.publish(channel,RedisMessage(data=msg,headers={"content-type":content_typeor"",**(headersor{}),},reply_to=reply_to,).json(),)
Congratulations, after implementing all these methods, you will have a broker capable of correctly sending and receiving messages.
All custom arguments passed to the _wrap_handler function will be further passed to your _get_log_context method.
Now your broker not only sends and receives messages, but also logs incoming messages in its own format. Congratulations, you are breathtaken!
Success
If you have implemented a broker for your source I am waiting for your PR! I am ready to help you with testing, implementation of specific parts, documentation and everything else. Your work will definitely become a part of Propan.