Skip to content

Propan logo

Tests coverage Coverage Package version downloads
Supported Python versions GitHub Discord

Propan

Propan - just another HTTP a declarative Python Messaging framework.

Inspired by FastAPI and Kombu, Propan was created to simplify Message Brokers' code writing and to provide a helpful development toolkit, which existed only in HTTP-frameworks world until now.

It's designed to create reactive microservices around Messaging.

It is a modern, high-level framework on top of popular specific Python brokers libraries, based on pydantic, FastAPI, and pytest concepts.


⚠⚠⚠ Deprecation notice ⚠⚠⚠

Propan project is superceeded by FastStream.

FastStream is a new package based on the ideas and experiences gained from FastKafka and Propan. By joining our forces, we picked up the best from both packages and created a unified way to write services capable of processing streamed data regardless of the underlying protocol.

I’ll continue to maintain Propan package, but new development will be in FastStream. If you are starting a new service, FastStream is the recommended way to do it.

For now FastStream supports Kafka and RabbitMQ. Other brokers support will be added in a few months.

You can find a detail migration guide in the documentation


The key features are

  • Simple: Designed to be easy to use and learn.
  • Intuitive: Great editor support. Autocompletion everywhere.
  • Dependencies management: Minimization of code duplication. Access to dependencies at any level of the call stack.
  • Integrations: Propan is fully compatible with any HTTP framework you want
  • MQ independent: Single interface to popular MQ:
  • RPC: The framework supports RPC requests on top of message brokers, which will allow performing long operations on remote services asynchronously.
  • Great to develop: CLI tool provides great development experience:
    • framework-independent way to manage the project environment
    • application code hot reload
    • robust application templates
  • Documentation: Propan automatically generates and presents an interactive AsyncAPI documentation for your project
  • Testability: Propan allows you to test your app without external dependencies: you do not have to set up a Message Broker, you can use a virtual one!

Declarative

With declarative tools you can define what you need to get. With traditional imperative tools you must write what you need to do.

Take a look at classic imperative tools, such as aio-pika, pika, redis-py, nats-py, aiokafka, etc.

This is the Quickstart with the aio-pika:

import asyncio
import aio_pika

async def main():
    connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/"
    )

    queue_name = "test_queue"

    async with connection:
        channel = await connection.channel()

        queue = await channel.declare_queue(queue_name)

        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                async with message.process():
                    print(message.body)

asyncio.run(main())

aio-pika is a great tool with a really easy learning curve. But it's still imperative. You need to connect, declare channel, queues, exchanges by yourself. Also, you need to manage connection, message, queue context to avoid any troubles.

It is not a bad way, but it can be much easier.

from propan import PropanApp, RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")

app = PropanApp(broker)

@broker.handle("test_queue")
async def base_handler(body):
    print(body)

This is the Propan declarative way to write the same code. That is so much easier, isn't it?


Type casting

Propan uses pydantic to cast incoming function arguments to types according to their annotation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from pydantic import BaseModel

...

class SimpleMessage(BaseModel):
    key: int

@broker.handle("test2")
async def second_handler(body: SimpleMessage):
    assert isinstance(body.key, int)

Dependencies

Propan has a dependencies management policy close to pytest fixtures and FastAPI Depends at the same time. Function arguments declare which dependencies you want are needed, and a special decorator delivers them from the global Context object.

Already declared context fields are: app, broker, context (itself), logger and message. If you call a non-existent field, raises pydantic.error_wrappers.ValidationError value.

But you can specify your own dependencies, call dependencies functions and more.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from logging import Logger
from propan import Context, Depends

...

async def base_dep(user_id: int) -> bool:
    return True

@broker.handle("test")
async def base_handler(user_id: int,
                       dep: bool = Depends(base_dep),
                       logger: Logger = Context()):
    assert dep is True
    logger.info(body)

Project Documentation

Propan automatically generates documentation for your project according to the AsyncAPI specification. You can work with both generated artifacts and place a Web view of your documentation on resources available to related teams.

The availability of such documentation significantly simplifies the integration of services: you can immediately see what channels and message format the application works with. And most importantly, it won't cost anything - Propan has already created the docs for you!

HTML-page


HTTP Frameworks integrations

Any Framework

You can use Propan MQBrokers without PropanApp. Just start and stop them according to your application lifespan.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from propan import RedisBroker
from sanic import Sanic

app = Sanic("MyHelloWorldApp")
broker = RedisBroker("redis://localhost:6379")

@broker.handle("test")
async def base_handler(body):
    print(body)

@app.after_server_start
async def start_broker(app, loop):
    await broker.start()

@app.after_server_stop
async def stop_broker(app, loop):
    await broker.close()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from propan import RabbitBroker
from sanic import Sanic

app = Sanic("MyHelloWorldApp")
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")

@broker.handle("test")
async def base_handler(body):
    print(body)

@app.after_server_start
async def start_broker(app, loop):
    await broker.start()

@app.after_server_stop
async def stop_broker(app, loop):
    await broker.close()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from propan import KafkaBroker
from sanic import Sanic

app = Sanic("MyHelloWorldApp")
broker = KafkaBroker("localhost:9092")

@broker.handle("test")
async def base_handler(body):
    print(body)

@app.after_server_start
async def start_broker(app, loop):
    await broker.start()

@app.after_server_stop
async def stop_broker(app, loop):
    await broker.close()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from propan import SQSBroker
from sanic import Sanic

app = Sanic("MyHelloWorldApp")
broker = SQSBroker("http://localhost:9324", ...)

@broker.handle("test")
async def base_handler(body):
    print(body)

@app.after_server_start
async def start_broker(app, loop):
    await broker.start()

@app.after_server_stop
async def stop_broker(app, loop):
    await broker.close()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from propan import NatsBroker
from sanic import Sanic

app = Sanic("MyHelloWorldApp")
broker = NatsBroker("nats://localhost:4222")

@broker.handle("test")
async def base_handler(body):
    print(body)

@app.after_server_start
async def start_broker(app, loop):
    await broker.start()

@app.after_server_stop
async def stop_broker(app, loop):
    await broker.close()

FastAPI Plugin

Also, Propan can be used as part of FastAPI.

Just import a PropanRouter you need and declare the message handler using the @event decorator. This decorator is similar to the decorator @handle for the corresponding brokers.

Tip

When used this way, Propan does not utilize its own dependency system, but integrates into FastAPI. That is, you can use Depends, BackgroundTasks and other tools FastAPI as if it were a regular HTTP endpoint.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from fastapi import Depends, FastAPI
from pydantic import BaseModel
from propan.fastapi import RedisRouter

router = RedisRouter("redis://localhost:6379")

app = FastAPI(lifespan=router.lifespan_context)

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.event("test")
async def hello(m: Incoming, d = Depends(call)) -> dict:
    return { "response": "Hello, world!"}

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from fastapi import Depends, FastAPI
from pydantic import BaseModel
from propan.fastapi import RabbitRouter

router = RabbitRouter("amqp://guest:guest@localhost:5672")

app = FastAPI(lifespan=router.lifespan_context)

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.event("test")
async def hello(m: Incoming, d = Depends(call)) -> dict:
    return { "response": "Hello, world!"}

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from fastapi import Depends, FastAPI
from pydantic import BaseModel
from propan.fastapi import KafkaRouter

router = KafkaRouter("localhost:9092")

app = FastAPI(lifespan=router.lifespan_context)

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.event("test")
async def hello(m: Incoming, d = Depends(call)) -> dict:
    return { "response": "Hello, world!"}

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from fastapi import Depends, FastAPI
from pydantic import BaseModel
from propan.fastapi import SQSRouter

router = SQSRouter("http://localhost:9324")

app = FastAPI(lifespan=router.lifespan_context)

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.event("test")
async def hello(m: Incoming, d = Depends(call)) -> dict:
    return { "response": "Hello, world!"}

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from fastapi import Depends, FastAPI
from pydantic import BaseModel
from propan.fastapi import NatsRouter

router = NatsRouter("nats://localhost:4222")

app = FastAPI(lifespan=router.lifespan_context)

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.event("test")
async def hello(m: Incoming, d = Depends(call)) -> dict:
    return { "response": "Hello, world!"}

app.include_router(router)

Note

More integration examples can be found here


Supported MQ brokers

Need your help

The framework is now in active development. We have a very long list of what has yet to be implemented and various brokers are only part of it. If you want to implement something from this list or help in any other way, take a look here

async sync
RabbitMQ ✔ stable ✔ 🛠 WIP 🛠
Redis ✔ stable ✔ 🔍 planning 🔍
Nats ✔ stable ✔ 🔍 planning 🔍
Kafka ⚠ beta ⚠ 🔍 planning 🔍
SQS ⚠ beta ⚠ 🔍 planning 🔍
NatsJS ⚠ beta ⚠ 🔍 planning 🔍
ZeroMQ 🛠 WIP 🛠 🔍 planning 🔍
MQTT 🔍 planning 🔍 🔍 planning 🔍
Redis Streams 🔍 planning 🔍 🔍 planning 🔍
Pulsar 🔍 planning 🔍 🔍 planning 🔍
ActiveMQ 🔍 planning 🔍 🔍 planning 🔍
AzureSB 🔍 planning 🔍 🔍 planning 🔍