Skip to content

Сериализация

По умолчанию для отправки и приема сообщений Propan использует формат JSON. Однако, если у вас возникает необходимость обрабатывать сообщения других форматов или с дополнительным шагами для сериализации (gzip, Avro, Protobuf, и т.д.), вы можете модицифицировать логику сериализации сообщений.

Шаги по сериализации

До того, как сообщение попадет в вашу функцию, Propan применяет к нему последовательно 2 функции: parse_message и decode_message. Вы можете модицировать один или оба этапа в зависимости от ваших потребностей.

Message Parsing

На этом этапе Propan сериализует входящие сообщение того фреймворка, который используется для работы с брокером в общее представление - PropanMessage. На этом этапе тело сообщения остается в виде сырых байтов.

Сигнатура функции выглядит следующим образом:

1
2
3
4
5
6
7
8
9
from propan import PropanMessage

async def parse_message(
    message: bytes
) -> PropanMessage[bytes]:
    return PropanMessage(
        body=message,
        raw_message=message,
    )
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from aio_pika.message import IncomingMessage
from propan import PropanMessage

async def parse_message(
    message: IncomingMessage
) -> PropanMessage[IncomingMessage]:
    return PropanMessage(
        body=message.body,
        headers=message.headers,
        reply_to=message.reply_to or "",
        message_id=message.message_id,
        content_type=message.content_type or "",
        raw_message=message,
    )
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from aiokafka.structs import ConsumerRecord
from propan import PropanMessage

async def parse_message(
    message: ConsumerRecord
) -> PropanMessage[ConsumerRecord]:
    headers = {i: j.decode() for i, j in message.headers}
    return PropanMessage(
        body=message.value,
        raw_message=message,
        message_id=f"{message.offset}-{message.timestamp}",
        reply_to=headers.get("reply_to", ""),
        content_type=headers.get("content-type"),
        headers=headers,
    )
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from typing import Dict, Any
from propan import PropanMessage

async def parse_message(
    message: Dict[str, Any],
) -> PropanMessage[Dict[str, Any]]:
    attributes = message.get("MessageAttributes", {})
    headers = {i: j.get("StringValue") for i, j in attributes.items()}
    return PropanMessage(
        body=message.get("Body", "").encode(),
        message_id=message.get("MessageId"),
        content_type=headers.pop("content-type", None),
        reply_to=headers.pop("reply_to", None) or "",
        headers=headers,
        raw_message=message,
    )
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from nats.aio.msg import Msg
from propan import PropanMessage

async def parse_message(
    message: Msg
) -> PropanMessage[Msg]:
    return PropanMessage(
        body=message.data,
        content_type=message.header.get("content-type", ""),
        headers=message.header,
        reply_to=message.reply,
        raw_message=message,
    )

Этот этап сильно связан с особенностями используемого брокера и в большинстве случаев его переопределение не понадобиться.

Однако, такая возможность все-таки есть. Вы можете переопределить этот метод как для всего брокера, так и для отдельных обработчиков:

1
2
3
4
5
6
7
8
9
from propan import RabbitBroker

async def custom_parse(msg, original_parser):
    return original_parser(msg)

broker = RabbitBroker(parse_message=custom_parse)

@broker.handle("test", parse_message=custom_parse)
async def handler(): ...

Ваша функция должна принимать 2 аргумента: само "сырое" сообщение и оригинальную функцию-обработчик. Таким образом, вы можете как полностью переопределить логику парсинга сообщения, так и модицифировать сообщение частично, а затем использовать оригинальный механизм Propan.

Парсер, объявленный на уровне broker будет применяться ко всем обработчиком. Парсер, объявленный на уровне handle - применяется только к этому обработчику (при этом он игнорирует парсер broker'а, если тот был указан ранее).

Message Decoding

На этом этапе тело PropanMessage приводится к тому виду, в каком оно попадает в вашу функцию-обработчик. Именно этот метод вам придется перепределять чаще всего.

В оригинале его сигнатуреа достаточно проста (хотя это и несколько упрощенный вариант):

1
2
3
4
5
6
7
8
9
import json
from propan import PropanMessage

async def decode_message(message: PropanMessage):
    body = message.body
    if message.content_type is not None:
        return json.loads(body.decode())
    else:
        return body

Для его переопределения используется тот же самый механизм, что и для парсера:

1
2
3
4
5
6
7
8
9
from propan import RabbitBroker

async def custom_decode(msg, original_decoded):
    return original_decoded(msg)

broker = RabbitBroker(decode_message=custom_decode)

@broker.handle("test", decode_message=custom_decode)
async def handler(): ...

Пример с Protobuf

В данном разделе мы рассмотрим пример с использование Protobuf, однако, он применим также и для любых других способов сериализации.

Protobuf

Protobuf - альтернативный способ сериализации сообщений, обычно используемый в GRPC. Основное его преимущество - значительно меньший1 размер сообщений (по сравнений с JSON), однако он требует наличие схемы сообщений (.proto файлов) как на стороне клиента, так и на стороне сервера.

Для начала установим зависимости:

pip install grpcio-tools

Затем опишем схему нашего сообщения

message.proto
syntax = "proto3";

message Person {
    string name = 1;
    float age = 2;
}

Теперь сгенерируем Python-класс для работы с сообщениями в формате Protobuf

python -m grpc_tools.protoc --python_out=. --pyi_out=. -I . message.proto

На выходе мы получаем 2 файла: message_pb2.py и message_pb2.pyi. Теперь мы готов с использовать сгенерированный класс для сериализации наших сообщений.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from message_pb2 import Person

from propan import PropanApp, RabbitBroker
from propan.annotations import Logger, NoCast
from propan.brokers.rabbit import RabbitMessage

broker = RabbitBroker()
app = PropanApp(broker)

async def decode_message(msg: RabbitMessage, original) -> Person:
    decoded = Person()
    decoded.ParseFromString(msg.body)
    return decoded

@broker.handle("test", decode_message=decode_message)
async def consume(body: NoCast[Person], logger: Logger):
    logger.info(body)

@app.after_startup
async def publish():
    body = Person(name="john", age=25).SerializeToString()
    await broker.publish(body, "test")

Обратите внимание, что мы использовали аннотацию NoCast, которая исключает сообщение из pydantic-представления нашего обработчика.

async def consume(body: NoCast[Person], logger: Logger):

  1. Например, сообщение вида { "name": "john", "age": 25 } в JSON занимает 27 байт, а в Protobuf - 11. Со списками и более сложными структурами экономия может быть еще более значительной (до 20х раз).