Перейти к содержанию

Propan logo

Tests coverage Coverage Package version downloads
Supported Python versions GitHub Discord

Propan

Propan - это еще один HTTP декларативный Python MQ фреймворк. Он идет по стопам FastAPI и Kombu, максимально упрощая написание кода и предоставляя все удобства инструментов, которые до этого существовали только в мире HTTP фремворков, однако, создан для работы с различными брокерами сообщений на основе AMQP, MQTT и др. протоколов.

Он идеально подходит для создания реактивных микросервисов на основе архитектуры Messaging.

Это современный, высокоуровневый фреймворк, разработанный на основе популярных python библиотек для работы со специфичными брокерами, а в его основе лежит pydantic, идеи FastAPI and pytest.


Ключевые особенности

  • Простота: спроектирован для максимальной простоты изучения и использования.
  • Интуитивность: Отличная поддержка IDE, автодополнение даже в vim`е.
  • Управление зависимостями: Эффективное переиспользование за счет аннотации типов. Доступ к зависимостями во всем стеке вызова.
  • Интeграция: Propan полностью совместим с любыми HTTP фреймворками
  • Независимость от брокеров: Единый интерфейс для популярных брокеров:
  • RPC: Фреймворк поддерживает RPC запросы поверх брокеров сообщений, что позволит выполнять длительные операции на удаленных сервисах асинхронно.
  • Скорость разработки: собственный CLI инструмент предоставляет отличный опыт разработки:
    • Полностью совместимый с любым фреймворком способ управлять окружением проекта
    • hot reloading при изменениях в коде
    • Готовые шаблоны проекта
  • Документация: Propan автоматически генерирует и представляет интерактивную AsyncAPI документацию для вашего проекта
  • Тестируемость: Propan позволяет тестировать ваше приложение без внешних зависимостей: вам не нужно поднимать брокер сообщений, используйте виртуального!

Декларативность

Декларативные иснтрументы позволяют нам описывать что мы хотим получить, в то время как традиционные императивные инструменты заставляют нас писать что мы хотим сделать.

К традиционным императивным библиотекам относятся aio-pika, pika, redis-py, nats-py, aiokafka и подобные.

Например, это Quickstart из библиотеки aio-pika:

import asyncio
import aio_pika

async def main():
    connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/"
    )

    queue_name = "test_queue"

    async with connection:
        channel = await connection.channel()

        queue = await channel.declare_queue(queue_name)

        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                async with message.process():
                    print(message.body)

asyncio.run(main())

aio-pika - это действительно отличный инструмент с легкой кривой обучения. Но он все еще императивный. Вам необходимо самому объявлять и инициализировать connect, channel, queue и exchange. Также, вам нужно управлять контекстом вашего connection, message, queue для того, чтобы избежать возможных проблем с обработкой.

Это не плохой способ написания кода, но он может быть проще.

from propan import PropanApp, RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")

app = PropanApp(broker)

@broker.handle("test_queue")
async def base_handler(body):
    print(body)

Это декларативный способ написать тот же код с помощью Propan. Разве это не проще?


Приведение типов

Propan использует pydantic для приведения типов входящих аргументов в соответсвии с их аннотацией.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from pydantic import BaseModel

...

class SimpleMessage(BaseModel):
    key: int

@broker.handle("test2")
async def second_handler(body: SimpleMessage):
    assert isinstance(body.key, int)

Зависимости

Propan имеет систему управления зависимостями, очень близкую к pytest fixtures и FastAPI Depends одновременно. Входящие аргументы функции объявляют, какие зависимости нужны, а декоратор - доставляет эти зависимости из глобального контекста.

По умолчанию, в проекте объявлены следующие зависимости: app, broker, context, logger и message. Вы можете в любой момент расширить этот список, добавив свои зависимости. При попытке доступа к несуществующим зависимостям, вы просто получите None.

Также, вы можете вызывать функции в качестве зависимостей и делать некоторые другие трюки.

Подробнее будет чуть дальше.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from logging import Logger
from propan import Context, Depends

...

async def base_dep(user_id: int) -> bool:
    return True

@broker.handle("test")
async def base_handler(user_id: int,
                       dep: bool = Depends(base_dep),
                       logger: Logger = Context()):
    assert dep is True
    logger.info(body)

Документация Проекта

Propan автоматически генерирует документацию для вашего проекта в соответсвии со спецификацией AsyncAPI. Вы можете работать как со сгенерированным артефактами, так и разместить Web-представление вашей документацие на ресурсах, доступных для смежных команд.

Наличие такой документации существенно упрощает интеграцию сервисов: вы сразу видите, с какими каналами и каким форматом сообщений работает приложение. А самое главное, это не стоит вам ничего - Propan уже сделал все за вас!

HTML-page


Использование с HTTP фреймворками

С любыми фреймворками

Вы можете использовать брокеры Propan без самого Propan приложения. Просто запустите и остановите его вместе с вашим HTTP приложением.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from propan import RedisBroker
from sanic import Sanic

app = Sanic("MyHelloWorldApp")
broker = RedisBroker("redis://localhost:6379")

@broker.handle("test")
async def base_handler(body):
    print(body)

@app.after_server_start
async def start_broker(app, loop):
    await broker.start()

@app.after_server_stop
async def stop_broker(app, loop):
    await broker.close()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from propan import RabbitBroker
from sanic import Sanic

app = Sanic("MyHelloWorldApp")
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")

@broker.handle("test")
async def base_handler(body):
    print(body)

@app.after_server_start
async def start_broker(app, loop):
    await broker.start()

@app.after_server_stop
async def stop_broker(app, loop):
    await broker.close()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from propan import KafkaBroker
from sanic import Sanic

app = Sanic("MyHelloWorldApp")
broker = KafkaBroker("localhost:9092")

@broker.handle("test")
async def base_handler(body):
    print(body)

@app.after_server_start
async def start_broker(app, loop):
    await broker.start()

@app.after_server_stop
async def stop_broker(app, loop):
    await broker.close()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from propan import SQSBroker
from sanic import Sanic

app = Sanic("MyHelloWorldApp")
broker = SQSBroker("http://localhost:9324", ...)

@broker.handle("test")
async def base_handler(body):
    print(body)

@app.after_server_start
async def start_broker(app, loop):
    await broker.start()

@app.after_server_stop
async def stop_broker(app, loop):
    await broker.close()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from propan import NatsBroker
from sanic import Sanic

app = Sanic("MyHelloWorldApp")
broker = NatsBroker("nats://localhost:4222")

@broker.handle("test")
async def base_handler(body):
    print(body)

@app.after_server_start
async def start_broker(app, loop):
    await broker.start()

@app.after_server_stop
async def stop_broker(app, loop):
    await broker.close()

С FastAPI

Также, Propan может использоваться как часть FastAPI.

Для этого просто импортируйте нужный вам PropanRouter и объявите обработчик сообщений с помощью декоратора @event. Этот декоратор аналогичен декоратору @handle у соответсвующих брокеров.

Tip

При использовании таким образом Propan не использует собственную систему зависимостей, а интегрируется в FastAPI. Т.е. вы можете использовать Depends, BackgroundTasks и прочие инструменты FastAPI так, если бы это был обычный HTTP-endpoint.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from fastapi import Depends, FastAPI
from pydantic import BaseModel
from propan.fastapi import RedisRouter

router = RedisRouter("redis://localhost:6379")

app = FastAPI(lifespan=router.lifespan_context)

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.event("test")
async def hello(m: Incoming, d = Depends(call)) -> dict:
    return { "response": "Hello, world!"}

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from fastapi import Depends, FastAPI
from pydantic import BaseModel
from propan.fastapi import RabbitRouter

router = RabbitRouter("amqp://guest:guest@localhost:5672")

app = FastAPI(lifespan=router.lifespan_context)

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.event("test")
async def hello(m: Incoming, d = Depends(call)) -> dict:
    return { "response": "Hello, world!"}

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from fastapi import Depends, FastAPI
from pydantic import BaseModel
from propan.fastapi import KafkaRouter

router = KafkaRouter("localhost:9092")

app = FastAPI(lifespan=router.lifespan_context)

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.event("test")
async def hello(m: Incoming, d = Depends(call)) -> dict:
    return { "response": "Hello, world!"}

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from fastapi import Depends, FastAPI
from pydantic import BaseModel
from propan.fastapi import SQSRouter

router = SQSRouter("http://localhost:9324")

app = FastAPI(lifespan=router.lifespan_context)

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.event("test")
async def hello(m: Incoming, d = Depends(call)) -> dict:
    return { "response": "Hello, world!"}

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from fastapi import Depends, FastAPI
from pydantic import BaseModel
from propan.fastapi import NatsRouter

router = NatsRouter("nats://localhost:4222")

app = FastAPI(lifespan=router.lifespan_context)

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.event("test")
async def hello(m: Incoming, d = Depends(call)) -> dict:
    return { "response": "Hello, world!"}

app.include_router(router)

Note

Больше примеров использования с другими фреймворками вы найдете здесь


Поддерживаемые брокеры

Нужна ваша помощь

Фреймоворк сейчас активно развивается. У нас очень длинный список того, что еще предстоит реализовать и различные брокеры - только его часть. Если вы хотите реализовать что-то из этого списка или помочь любым другим способом - загляните сюда

async sync
RabbitMQ ✔ stable ✔ 🛠 WIP 🛠
Redis ✔ stable ✔ 🔍 planning 🔍
Nats ✔ stable ✔ 🔍 planning 🔍
Kafka ⚠ beta ⚠ 🔍 planning 🔍
SQS ⚠ beta ⚠ 🔍 planning 🔍
NatsJS ⚠ beta ⚠ 🔍 planning 🔍
ZeroMQ 🛠 WIP 🛠 🔍 planning 🔍
MQTT 🔍 planning 🔍 🔍 planning 🔍
Redis Streams 🔍 planning 🔍 🔍 planning 🔍
Pulsar 🔍 planning 🔍 🔍 planning 🔍
ActiveMQ 🔍 planning 🔍 🔍 planning 🔍
AzureSB 🔍 planning 🔍 🔍 planning 🔍