INTEGRATIONS¶
Propan brokers are very easy to integrate with any of your applications: it is enough to initialize the broker at startup and close it correctly at the end of your application.
Most HTTP frameworks have built-in lifecycle hooks for this.
Tip
If you want to use Propan in conjunction with FastAPI, perhaps you should use a special plugin
from contextlib import asynccontextmanager
from fastapi import FastAPI
from propan import RabbitBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastAPI()
@asynccontextmanager
async def lifespan(app: FastAPI):
await broker.start()
yield
await broker.close()
@broker.handle("test")
async def base_handler(body):
print(body)
@app.get("/")
def read_root():
return {"Hello": "World"}
from aiohttp import web
from propan import RabbitBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
@broker.handle("test")
async def base_handler(body):
print(body)
async def start_broker(app):
await broker.start()
async def stop_broker(app):
await broker.close()
async def hello(request):
return web.Response(text="Hello, world")
app = web.Application()
app.add_routes([web.get("/", hello)])
app.on_startup.append(start_broker)
app.on_cleanup.append(stop_broker)
if __name__ == "__main__":
web.run_app(app)
from blacksheep import Application
from propan import RabbitBroker
app = Application()
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
@broker.handle("test")
async def base_handler(body):
print(body)
@app.on_start
async def start_broker(application: Application) -> None:
await broker.start()
@app.on_stop
async def stop_broker(application: Application) -> None:
await broker.close()
@app.route("/")
async def home():
return "Hello, World!"
import falcon
import falcon.asgi
from propan import RabbitBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
@broker.handle("test")
async def base_handler(body):
print(body)
class ThingsResource:
async def on_get(self, req, resp):
resp.status = falcon.HTTP_200
resp.content_type = falcon.MEDIA_TEXT
resp.text = (
"\nTwo things awe me most, the starry sky "
"above me and the moral law within me.\n"
"\n"
" ~ Immanuel Kant\n\n"
)
class PropanMiddleware:
async def process_startup(self, scope, event):
await broker.start()
async def process_shutdown(self, scope, event):
await broker.close()
app = falcon.asgi.App()
app.add_middleware(PropanMiddleware())
app.add_route("/things", ThingsResource())
from quart import Quart
from propan import RabbitBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = Quart(__name__)
@broker.handle("test")
async def base_handler(body):
print(body)
@app.before_serving
async def start_broker():
await broker.start()
@app.after_serving
async def stop_broker():
await broker.close()
@app.route("/")
async def json():
return {"hello": "world"}
from sanic import Sanic
from sanic.response import text
from propan import RabbitBroker
app = Sanic("MyHelloWorldApp")
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
@broker.handle("test")
async def base_handler(body):
print(body)
@app.after_server_start
async def start_broker(app, loop):
await broker.start()
@app.after_server_stop
async def stop_broker(app, loop):
await broker.close()
@app.get("/")
async def hello_world(request):
return text("Hello, world.")
However, even if such a hook is not provided, you can do it yourself.
import asyncio
import tornado.web
from propan import RabbitBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
@broker.handle("test")
async def base_handler(body):
print(body)
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world")
def make_app():
return tornado.web.Application(
[
(r"/", MainHandler),
]
)
async def main():
app = make_app()
app.listen(8888)
await broker.start()
try:
await asyncio.Event().wait()
finally:
await broker.close()
if __name__ == "__main__":
asyncio.run(main())