Skip to content

Custom Serialization

By default, Propan uses the JSON format to send and receive messages. However, if you need to handle messages in other formats or with extra serialization steps such as gzip, Avro, Protobuf, and the like, you can modify the serialization logic.

Serialization Steps

Before the message gets into your handler, Propan applies 2 functions to it sequentially: parse_message and decode_message. You can modify one or both stages depending on your needs.

Message Parsing

At this stage, Propan serializes an incoming message of the framework that is used to work with the broker into a general view - PropanMessage. At this stage, the message body remains in the form of raw bytes.

The signature of the function looks like this:

1
2
3
4
5
6
7
8
9
from propan import PropanMessage

async def parse_message(
    message: bytes
) -> PropanMessage[bytes]:
    return PropanMessage(
        body=message,
        raw_message=message,
    )
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from aio_pika.message import IncomingMessage
from propan import PropanMessage

async def parse_message(
    message: IncomingMessage
) -> PropanMessage[IncomingMessage]:
    return PropanMessage(
        body=message.body,
        headers=message.headers,
        reply_to=message.reply_to or "",
        message_id=message.message_id,
        content_type=message.content_type or "",
        raw_message=message,
    )
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from aiokafka.structs import ConsumerRecord
from propan import PropanMessage

async def parse_message(
    message: ConsumerRecord
) -> PropanMessage[ConsumerRecord]:
    headers = {i: j.decode() for i, j in message.headers}
    return PropanMessage(
        body=message.value,
        raw_message=message,
        message_id=f"{message.offset}-{message.timestamp}",
        reply_to=headers.get("reply_to", ""),
        content_type=headers.get("content-type"),
        headers=headers,
    )
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from typing import Dict, Any
from propan import PropanMessage

async def parse_message(
    message: Dict[str, Any],
) -> PropanMessage[Dict[str, Any]]:
    attributes = message.get("MessageAttributes", {})
    headers = {i: j.get("StringValue") for i, j in attributes.items()}
    return PropanMessage(
        body=message.get("Body", "").encode(),
        message_id=message.get("MessageId"),
        content_type=headers.pop("content-type", None),
        reply_to=headers.pop("reply_to", None) or "",
        headers=headers,
        raw_message=message,
    )
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from nats.aio.msg import Msg
from propan import PropanMessage

async def parse_message(
    message: Msg
) -> PropanMessage[Msg]:
    return PropanMessage(
        body=message.data,
        content_type=message.header.get("content-type", ""),
        headers=message.header,
        reply_to=message.reply,
        raw_message=message,
    )

This stage is strongly related to the features of the broker used and in most cases, its redefinition is not necessary.

However, it is still possible. You can override this method both for the entire broker and for individual handlers:

1
2
3
4
5
6
7
8
9
from propan import RabbitBroker

async def custom_parse(msg, original_parser):
    return original_parser(msg)

broker = RabbitBroker(parse_message=custom_parse)

@broker.handle("test", parse_message=custom_parse)
async def handler(): ...

Your function should take 2 arguments: a "raw" message itself and the original handler function. Thus, you can either completely redefine the message parsing logic, or partially modify a message, and then use the original Propan mechanism.

The parser declared at the broker level will be applied to all handlers. The parser declared at the handle level is applied only to this handler (it ignores the `broker' parser if it was specified earlier).

Message Decoding

At this stage, the body of PropanMessage is transformed to the form in which it enters your handler function. This method you will have to redefine more often.

In the original, its signature is quite simple (this is a simplified version):

1
2
3
4
5
6
7
8
9
import json
from propan import PropanMessage

async def decode_message(message: PropanMessage):
    body = message.body
    if message.content_type is not None:
        return json.loads(body.decode())
    else:
        return body

To redefine it, use the same way as the parser:

1
2
3
4
5
6
7
8
9
from propan import RabbitBroker

async def custom_decode(msg, original_decoded):
    return original_decoded(msg)

broker = RabbitBroker(decode_message=custom_decode)

@broker.handle("test", decode_message=custom_decode)
async def handler(): ...

Example with Protobuf

In this section, we will look at an example using Protobuf, however, it is also applicable for any other serialization methods.

Protobuf

Protobuf is an alternative message serialization method commonly used in GRPC. Its main advantage is much smaller 1 message size (compared to JSON), but it requires a message schema (.proto files) both on the client side and on the server side.

To begin with, install the dependencies:

pip install grpcio-tools

Then we will describe the scheme of our message

message.proto
syntax = "proto3";

message Person {
    string name = 1;
    float age = 2;
}

Now we will generate a Python class for working with messages in the Protobuf format

python -m grpc_tools.protoc --python_out=. --pyi_out=. -I . message.proto

At the output, we get 2 files: message_pb2.py and message_pb2.pyi. Now we are ready to use the generated class to serialize our messages.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from message_pb2 import Person

from propan import PropanApp, RabbitBroker
from propan.annotations import Logger, NoCast
from propan.brokers.rabbit import RabbitMessage

broker = RabbitBroker()
app = PropanApp(broker)

async def decode_message(msg: RabbitMessage, original) -> Person:
    decoded = Person()
    decoded.ParseFromString(msg.body)
    return decoded

@broker.handle("test", decode_message=decode_message)
async def consume(body: NoCast[Person], logger: Logger):
    logger.info(body)

@app.after_startup
async def publish():
    body = Person(name="john", age=25).SerializeToString()
    await broker.publish(body, "test")

Note that we used the NoCast annotation, which excludes the message from the pydantic representation of our handler.

async def consume(body: NoCast[Person], logger: Logger):

  1. For example, a message like { "name": "john", "age": 25 } in JSON takes 27 bytes, and in Protobuf - 11. With lists and more complex structures, the savings can be even more significant (up to 20x times).