FastAPI Plugin
Handle messages
Propan can be used as a part of FastAPI .
Just import a PropanRouter you need and declare the message handler
using the @event
decorator. This decorator is similar to the decorator @handle
for the related brokers.
Tip
When used in this way, Propan does not use its own dependency system, but integrates into FastAPI .
That is, you can use Depends
, BackgroundTasks
and other FastAPI tools as if it were a regular HTTP endpoint.
Note that the code below uses fastapi.Depends
, not propan.Depends
.
Redis RabbitMQ Kafka SQS NATS
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 from fastapi import Depends , FastAPI
from pydantic import BaseModel
from propan.fastapi import RedisRouter
router = RedisRouter ( "redis://localhost:6379" )
app = FastAPI ( lifespan = router . lifespan_context )
class Incoming ( BaseModel ):
m : dict
def call ():
return True
@router . event ( "test" )
async def hello ( m : Incoming , d = Depends ( call )) -> dict :
return { "response" : "Hello, Redis!" }
@router . get ( "/" )
async def hello_http ():
return "Hello, HTTP!"
app . include_router ( router )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 from fastapi import Depends , FastAPI
from pydantic import BaseModel
from propan.fastapi import RabbitRouter
router = RabbitRouter ( "amqp://guest:guest@localhost:5672" )
app = FastAPI ( lifespan = router . lifespan_context )
class Incoming ( BaseModel ):
m : dict
def call ():
return True
@router . event ( "test" )
async def hello ( m : Incoming , d = Depends ( call )) -> dict :
return { "response" : "Hello, Rabbit!" }
@router . get ( "/" )
async def hello_http ():
return "Hello, HTTP!"
app . include_router ( router )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 from fastapi import Depends , FastAPI
from pydantic import BaseModel
from propan.fastapi import KafkaRouter
router = KafkaRouter ( "localhost:9092" )
app = FastAPI ( lifespan = router . lifespan_context )
class Incoming ( BaseModel ):
m : dict
def call ():
return True
@router . event ( "test" )
async def hello ( m : Incoming , d = Depends ( call )) -> dict :
return { "response" : "Hello, Kafka!" }
@router . get ( "/" )
async def hello_http ():
return "Hello, HTTP!"
app . include_router ( router )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 from fastapi import Depends , FastAPI
from pydantic import BaseModel
from propan.fastapi import SQSRouter
router = SQSRouter ( "http://localhost:9324" )
app = FastAPI ( lifespan = router . lifespan_context )
class Incoming ( BaseModel ):
m : dict
def call ():
return True
@router . event ( "test" )
async def hello ( m : Incoming , d = Depends ( call )) -> dict :
return { "response" : "Hello, SQS!" }
@router . get ( "/" )
async def hello_http ():
return "Hello, HTTP!"
app . include_router ( router )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 from fastapi import Depends , FastAPI
from pydantic import BaseModel
from propan.fastapi import NatsRouter
router = NatsRouter ( "nats://localhost:4222" )
app = FastAPI ( lifespan = router . lifespan_context )
class Incoming ( BaseModel ):
m : dict
def call ():
return True
@router . event ( "test" )
async def hello ( m : Incoming , d = Depends ( call )) -> dict :
return { "response" : "Hello, Nats!" }
@router . get ( "/" )
async def hello_http ():
return "Hello, HTTP!"
app . include_router ( router )
When processing a message from a broker, the entire message body is placed simultaneously in both the body
and path
request parameters: you can get access to them
in any way convenient for you. The message header is placed in headers
.
Also, this router can be fully used as an HttpRouter
(of which it is the inheritor). So you can
use it to declare any get
, post
, put
and other HTTP methods. For example, this is done at 19 line.
Warning
If your ASGI server does not support installing state inside lifespan , you can disable this behavior as follows:
router = PropanRouter ( ... , setup_state = False )
However, after that you will not be able to access the broker from your application's
state (but it is still available as the
router.broker
)
Sending messages
Inside each router there is a broker. You can easily access it if you need to send a message to MQ.
Redis RabbitMQ Kafka SQS NATS
1
2
3
4
5
6
7
8
9
10
11
12
13 from fastapi import FastAPI
from propan.fastapi import RedisRouter
router = RedisRouter ( "redis://localhost:6379" )
app = FastAPI ( lifespan = router . lifespan_context )
@router . get ( "/" )
async def hello_http ():
await router . broker . publish ( "Hello, Redis!" , "test" )
return "Hello, HTTP!"
app . include_router ( router )
1
2
3
4
5
6
7
8
9
10
11
12
13 from fastapi import FastAPI
from propan.fastapi import RabbitRouter
router = RabbitRouter ( "amqp://guest:guest@localhost:5672" )
app = FastAPI ( lifespan = router . lifespan_context )
@router . get ( "/" )
async def hello_http ():
await router . broker . publish ( "Hello, Rabbit!" , "test" )
return "Hello, HTTP!"
app . include_router ( router )
1
2
3
4
5
6
7
8
9
10
11
12
13 from fastapi import FastAPI
from propan.fastapi import KafkaRouter
router = KafkaRouter ( "localhost:9092" )
app = FastAPI ( lifespan = router . lifespan_context )
@router . get ( "/" )
async def hello_http ():
await router . broker . publish ( "Hello, Kafka!" , "test" )
return "Hello, HTTP!"
app . include_router ( router )
1
2
3
4
5
6
7
8
9
10
11
12
13 from fastapi import FastAPI
from propan.fastapi import SQSRouter
router = SQSRouter ( "http://localhost:9324" )
app = FastAPI ( lifespan = router . lifespan_context )
@router . get ( "/" )
async def hello_http ():
await router . broker . publish ( "Hello, SQS!" , "test" )
return "Hello, HTTP!"
app . include_router ( router )
1
2
3
4
5
6
7
8
9
10
11
12
13 from fastapi import FastAPI
from propan.fastapi import NatsRouter
router = NatsRouter ( "nats://localhost:4222" )
app = FastAPI ( lifespan = router . lifespan_context )
@router . get ( "/" )
async def hello_http ():
await router . broker . publish ( "Hello, Nats!" , "test" )
return "Hello, HTTP!"
app . include_router ( router )
You can use the following Depends
to access the broker if you want to use it at different parts of your program.
Redis RabbitMQ Kafka SQS NATS
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 from fastapi import FastAPI , Depends
from propan import RedisBroker
from propan.fastapi import RedisRouter
from typing_extensions import Annotated
router = RedisRouter ( "redis://localhost:6379" )
app = FastAPI ( lifespan = router . lifespan_context )
def broker ():
return router . broker
@router . get ( "/" )
async def hello_http ( broker : Annotated [ RedisBroker , Depends ( broker )]):
await broker . publish ( "Hello, Redis!" , "test" )
return "Hello, HTTP!"
app . include_router ( router )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 from fastapi import FastAPI , Depends
from propan import RabbitBroker
from propan.fastapi import RabbitRouter
from typing_extensions import Annotated
router = RabbitRouter ( "amqp://guest:guest@localhost:5672" )
app = FastAPI ( lifespan = router . lifespan_context )
def broker ():
return router . broker
@router . get ( "/" )
async def hello_http ( broker : Annotated [ RabbitBroker , Depends ( broker )]):
await broker . publish ( "Hello, Rabbit!" , "test" )
return "Hello, HTTP!"
app . include_router ( router )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 from fastapi import FastAPI , Depends
from propan import KafkaBroker
from propan.fastapi import KafkaRouter
from typing_extensions import Annotated
router = KafkaRouter ( "localhost:9092" )
app = FastAPI ( lifespan = router . lifespan_context )
def broker ():
return router . broker
@router . get ( "/" )
async def hello_http ( broker : Annotated [ KafkaBroker , Depends ( broker )]):
await broker . publish ( "Hello, Kafka!" , "test" )
return "Hello, HTTP!"
app . include_router ( router )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 from fastapi import FastAPI , Depends
from propan import SQSBroker
from propan.fastapi import SQSRouter
from typing_extensions import Annotated
router = SQSRouter ( "http://localhost:9324" )
app = FastAPI ( lifespan = router . lifespan_context )
def broker ():
return router . broker
@router . get ( "/" )
async def hello_http ( broker : Annotated [ SQSBroker , Depends ( broker )]):
await broker . publish ( "Hello, SQS!" , "test" )
return "Hello, HTTP!"
app . include_router ( router )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 from fastapi import FastAPI , Depends
from propan import NatsBroker
from propan.fastapi import NatsRouter
from typing_extensions import Annotated
router = NatsRouter ( "nats://localhost:4222" )
app = FastAPI ( lifespan = router . lifespan_context )
def broker ():
return router . broker
@router . get ( "/" )
async def hello_http ( broker : Annotated [ NatsBroker , Depends ( broker )]):
await broker . publish ( "Hello, Nats!" , "test" )
return "Hello, HTTP!"
app . include_router ( router )
Or you can access broker from a FastAPI application state
@app . get ( "/" )
def main ( request : Request ):
broker = request . state . broker
@after_startup
The PropanApp
application has the after_startup
hook, which allows you to perform operations with your message broker after the connection is established. This can be extremely convenient for managing your brokers' objects and/or sending messages. This hook is also available for your FastAPI PropanRouter
Documentation
When using Propan as a router for FastAPI , the framework automatically registers endpoints for hosting AsyncAPI documentation into your application with the following default values:
from propan.fastapi import RabbitRouter
router = RabbitRouter (
schema_url = "/asyncapi" ,
include_in_schema = True ,
)