Skip to content

FastAPI Plugin

Handle messages

Propan can be used as a part of FastAPI.

Just import a PropanRouter you need and declare the message handler using the @event decorator. This decorator is similar to the decorator @handle for the related brokers.

Tip

When used in this way, Propan does not use its own dependency system, but integrates into FastAPI. That is, you can use Depends, BackgroundTasks and other FastAPI tools as if it were a regular HTTP endpoint.

Note that the code below uses fastapi.Depends, not propan.Depends.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from fastapi import Depends, FastAPI
from pydantic import BaseModel
from propan.fastapi import RedisRouter

router = RedisRouter("redis://localhost:6379")

app = FastAPI(lifespan=router.lifespan_context)

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.event("test")
async def hello(m: Incoming, d = Depends(call)) -> dict:
    return { "response": "Hello, Redis!" }

@router.get("/")
async def hello_http():
    return "Hello, HTTP!"

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from fastapi import Depends, FastAPI
from pydantic import BaseModel
from propan.fastapi import RabbitRouter

router = RabbitRouter("amqp://guest:guest@localhost:5672")

app = FastAPI(lifespan=router.lifespan_context)

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.event("test")
async def hello(m: Incoming, d = Depends(call)) -> dict:
    return { "response": "Hello, Rabbit!" }

@router.get("/")
async def hello_http():
    return "Hello, HTTP!"

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from fastapi import Depends, FastAPI
from pydantic import BaseModel
from propan.fastapi import KafkaRouter

router = KafkaRouter("localhost:9092")

app = FastAPI(lifespan=router.lifespan_context)

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.event("test")
async def hello(m: Incoming, d = Depends(call)) -> dict:
    return { "response": "Hello, Kafka!" }

@router.get("/")
async def hello_http():
    return "Hello, HTTP!"

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from fastapi import Depends, FastAPI
from pydantic import BaseModel
from propan.fastapi import SQSRouter

router = SQSRouter("http://localhost:9324")

app = FastAPI(lifespan=router.lifespan_context)

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.event("test")
async def hello(m: Incoming, d = Depends(call)) -> dict:
    return { "response": "Hello, SQS!" }

@router.get("/")
async def hello_http():
    return "Hello, HTTP!"

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from fastapi import Depends, FastAPI
from pydantic import BaseModel
from propan.fastapi import NatsRouter

router = NatsRouter("nats://localhost:4222")

app = FastAPI(lifespan=router.lifespan_context)

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.event("test")
async def hello(m: Incoming, d = Depends(call)) -> dict:
    return { "response": "Hello, Nats!" }

@router.get("/")
async def hello_http():
    return "Hello, HTTP!"

app.include_router(router)

When processing a message from a broker, the entire message body is placed simultaneously in both the body and path request parameters: you can get access to them in any way convenient for you. The message header is placed in headers.

Also, this router can be fully used as an HttpRouter (of which it is the inheritor). So you can use it to declare any get, post, put and other HTTP methods. For example, this is done at 19 line.

Warning

If your ASGI server does not support installing state inside lifespan, you can disable this behavior as follows:

router = PropanRouter(..., setup_state=False)
However, after that you will not be able to access the broker from your application's state (but it is still available as the router.broker)

Sending messages

Inside each router there is a broker. You can easily access it if you need to send a message to MQ.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from fastapi import FastAPI
from propan.fastapi import RedisRouter

router = RedisRouter("redis://localhost:6379")

app = FastAPI(lifespan=router.lifespan_context)

@router.get("/")
async def hello_http():
    await router.broker.publish("Hello, Redis!", "test")
    return "Hello, HTTP!"

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from fastapi import FastAPI
from propan.fastapi import RabbitRouter

router = RabbitRouter("amqp://guest:guest@localhost:5672")

app = FastAPI(lifespan=router.lifespan_context)

@router.get("/")
async def hello_http():
    await router.broker.publish("Hello, Rabbit!", "test")
    return "Hello, HTTP!"

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from fastapi import FastAPI
from propan.fastapi import KafkaRouter

router = KafkaRouter("localhost:9092")

app = FastAPI(lifespan=router.lifespan_context)

@router.get("/")
async def hello_http():
    await router.broker.publish("Hello, Kafka!", "test")
    return "Hello, HTTP!"

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from fastapi import FastAPI
from propan.fastapi import SQSRouter

router = SQSRouter("http://localhost:9324")

app = FastAPI(lifespan=router.lifespan_context)

@router.get("/")
async def hello_http():
    await router.broker.publish("Hello, SQS!", "test")
    return "Hello, HTTP!"

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from fastapi import FastAPI
from propan.fastapi import NatsRouter

router = NatsRouter("nats://localhost:4222")

app = FastAPI(lifespan=router.lifespan_context)

@router.get("/")
async def hello_http():
    await router.broker.publish("Hello, Nats!", "test")
    return "Hello, HTTP!"

app.include_router(router)

You can use the following Depends to access the broker if you want to use it at different parts of your program.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from fastapi import FastAPI, Depends
from propan import RedisBroker
from propan.fastapi import RedisRouter
from typing_extensions import Annotated

router = RedisRouter("redis://localhost:6379")

app = FastAPI(lifespan=router.lifespan_context)

def broker():
    return router.broker

@router.get("/")
async def hello_http(broker: Annotated[RedisBroker, Depends(broker)]):
    await broker.publish("Hello, Redis!", "test")
    return "Hello, HTTP!"

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from fastapi import FastAPI, Depends
from propan import RabbitBroker
from propan.fastapi import RabbitRouter
from typing_extensions import Annotated

router = RabbitRouter("amqp://guest:guest@localhost:5672")

app = FastAPI(lifespan=router.lifespan_context)

def broker():
    return router.broker

@router.get("/")
async def hello_http(broker: Annotated[RabbitBroker, Depends(broker)]):
    await broker.publish("Hello, Rabbit!", "test")
    return "Hello, HTTP!"

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from fastapi import FastAPI, Depends
from propan import KafkaBroker
from propan.fastapi import KafkaRouter
from typing_extensions import Annotated

router = KafkaRouter("localhost:9092")

app = FastAPI(lifespan=router.lifespan_context)

def broker():
    return router.broker

@router.get("/")
async def hello_http(broker: Annotated[KafkaBroker, Depends(broker)]):
    await broker.publish("Hello, Kafka!", "test")
    return "Hello, HTTP!"

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from fastapi import FastAPI, Depends
from propan import SQSBroker
from propan.fastapi import SQSRouter
from typing_extensions import Annotated

router = SQSRouter("http://localhost:9324")

app = FastAPI(lifespan=router.lifespan_context)

def broker():
    return router.broker

@router.get("/")
async def hello_http(broker: Annotated[SQSBroker, Depends(broker)]):
    await broker.publish("Hello, SQS!", "test")
    return "Hello, HTTP!"

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from fastapi import FastAPI, Depends
from propan import NatsBroker
from propan.fastapi import NatsRouter
from typing_extensions import Annotated

router = NatsRouter("nats://localhost:4222")

app = FastAPI(lifespan=router.lifespan_context)

def broker():
    return router.broker

@router.get("/")
async def hello_http(broker: Annotated[NatsBroker, Depends(broker)]):
    await broker.publish("Hello, Nats!", "test")
    return "Hello, HTTP!"

app.include_router(router)

Or you can access broker from a FastAPI application state

@app.get("/")
def main(request: Request):
    broker = request.state.broker

@after_startup

The PropanApp application has the after_startup hook, which allows you to perform operations with your message broker after the connection is established. This can be extremely convenient for managing your brokers' objects and/or sending messages. This hook is also available for your FastAPI PropanRouter

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from fastapi import FastAPI
from propan.fastapi import RedisRouter

router = RedisRouter("redis://localhost:6379")

app = FastAPI(lifespan=router.lifespan_context)

@router.after_startup
def do_smth(app: FastAPI):
    ...

@router.after_startup
async def publish_smth(app: FastAPI):
    await router.broker.publish(...)

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from fastapi import FastAPI
from propan.fastapi import RabbitRouter

router = RabbitRouter("amqp://guest:guest@localhost:5672")

app = FastAPI(lifespan=router.lifespan_context)

@router.after_startup
def do_smth(app: FastAPI):
    ...

@router.after_startup
async def publish_smth(app: FastAPI):
    await router.broker.publish(...)

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from fastapi import FastAPI
from propan.fastapi import KafkaRouter

router = KafkaRouter("localhost:9092")

app = FastAPI(lifespan=router.lifespan_context)

@router.after_startup
def do_smth(app: FastAPI):
    ...

@router.after_startup
async def publish_smth(app: FastAPI):
    await router.broker.publish(...)

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from fastapi import FastAPI
from propan.fastapi import SQSRouter

router = SQSRouter("http://localhost:9324")

app = FastAPI(lifespan=router.lifespan_context)

@router.after_startup
def do_smth(app: FastAPI):
    ...

@router.after_startup
async def publish_smth(app: FastAPI):
    await router.broker.publish(...)

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from fastapi import FastAPI
from propan.fastapi import NatsRouter

router = NatsRouter("nats://localhost:4222")

app = FastAPI(lifespan=router.lifespan_context)

@router.after_startup
def do_smth(app: FastAPI):
    ...

@router.after_startup
async def publish_smth(app: FastAPI):
    await router.broker.publish(...)

app.include_router(router)

Documentation

When using Propan as a router for FastAPI, the framework automatically registers endpoints for hosting AsyncAPI documentation into your application with the following default values:

1
2
3
4
5
6
from propan.fastapi import RabbitRouter

router = RabbitRouter(
    schema_url="/asyncapi",
    include_in_schema=True,
)