FastAPI Plugin
Прием сообщений
Propan может использоваться как полноценная часть FastAPI .
Для этого просто импортируйте нужный вам PropanRouter и объявите обработчик сообщений
с помощью декоратора @event
. Этот декоратор аналогичен декоратору @handle
у соответсвующих брокеров.
Tip
При использовании таким образом Propan не использует собственную систему зависимостей, а интегрируется в FastAPI .
Т.е. вы можете использовать Depends
, BackgroundTasks
и прочие инструменты FastAPI так, если бы это был обычный HTTP-endpoint.
Обратите внимание, что в коде ниже используется fastapi.Depends
, а не propan.Depends
.
Redis RabbitMQ Kafka SQS NATS
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 from fastapi import Depends , FastAPI
from pydantic import BaseModel
from propan.fastapi import RedisRouter
router = RedisRouter ( "redis://localhost:6379" )
app = FastAPI ( lifespan = router . lifespan_context )
class Incoming ( BaseModel ):
m : dict
def call ():
return True
@router . event ( "test" )
async def hello ( m : Incoming , d = Depends ( call )) -> dict :
return { "response" : "Hello, Redis!" }
@router . get ( "/" )
async def hello_http ():
return "Hello, HTTP!"
app . include_router ( router )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 from fastapi import Depends , FastAPI
from pydantic import BaseModel
from propan.fastapi import RabbitRouter
router = RabbitRouter ( "amqp://guest:guest@localhost:5672" )
app = FastAPI ( lifespan = router . lifespan_context )
class Incoming ( BaseModel ):
m : dict
def call ():
return True
@router . event ( "test" )
async def hello ( m : Incoming , d = Depends ( call )) -> dict :
return { "response" : "Hello, Rabbit!" }
@router . get ( "/" )
async def hello_http ():
return "Hello, HTTP!"
app . include_router ( router )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 from fastapi import Depends , FastAPI
from pydantic import BaseModel
from propan.fastapi import KafkaRouter
router = KafkaRouter ( "localhost:9092" )
app = FastAPI ( lifespan = router . lifespan_context )
class Incoming ( BaseModel ):
m : dict
def call ():
return True
@router . event ( "test" )
async def hello ( m : Incoming , d = Depends ( call )) -> dict :
return { "response" : "Hello, Kafka!" }
@router . get ( "/" )
async def hello_http ():
return "Hello, HTTP!"
app . include_router ( router )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 from fastapi import Depends , FastAPI
from pydantic import BaseModel
from propan.fastapi import SQSRouter
router = SQSRouter ( "http://localhost:9324" )
app = FastAPI ( lifespan = router . lifespan_context )
class Incoming ( BaseModel ):
m : dict
def call ():
return True
@router . event ( "test" )
async def hello ( m : Incoming , d = Depends ( call )) -> dict :
return { "response" : "Hello, SQS!" }
@router . get ( "/" )
async def hello_http ():
return "Hello, HTTP!"
app . include_router ( router )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 from fastapi import Depends , FastAPI
from pydantic import BaseModel
from propan.fastapi import NatsRouter
router = NatsRouter ( "nats://localhost:4222" )
app = FastAPI ( lifespan = router . lifespan_context )
class Incoming ( BaseModel ):
m : dict
def call ():
return True
@router . event ( "test" )
async def hello ( m : Incoming , d = Depends ( call )) -> dict :
return { "response" : "Hello, Nats!" }
@router . get ( "/" )
async def hello_http ():
return "Hello, HTTP!"
app . include_router ( router )
При обработке сообщения из брокера все тело сообщения помещается одновременно и в body
, и в path
параметры запроса: вы можете достать получить к ним доступ любым удобным для вас способом. Заголовок сообщения помещается в headers
.
Также этот роутер может полноценно использоваться как HttpRouter
(наследником которого он и является). Поэтому вы можете
объявлять с его помощью любые get
, post
, put
и прочие HTTP методы. Как например, это сделано в строке 19 .
Warning
Если ваш ASGI сервер не поддерживает установку state внутри lifespan , вы можете отключить это поведение следующим образом:
router = PropanRouter ( ... , setup_state = False )
Однако, после этого вы не сможете получить доступ к брокеру из
state вашего приложения (но он все еще доступен как поле
router.broker
)
Отправка сообщений
Внутри каждого роутера есть соответсвующий брокер. Вы можете легко получить к нему доступ, если вам необходимо отправить сообщение в MQ.
Redis RabbitMQ Kafka SQS NATS
1
2
3
4
5
6
7
8
9
10
11
12
13 from fastapi import FastAPI
from propan.fastapi import RedisRouter
router = RedisRouter ( "redis://localhost:6379" )
app = FastAPI ( lifespan = router . lifespan_context )
@router . get ( "/" )
async def hello_http ():
await router . broker . publish ( "Hello, Redis!" , "test" )
return "Hello, HTTP!"
app . include_router ( router )
1
2
3
4
5
6
7
8
9
10
11
12
13 from fastapi import FastAPI
from propan.fastapi import RabbitRouter
router = RabbitRouter ( "amqp://guest:guest@localhost:5672" )
app = FastAPI ( lifespan = router . lifespan_context )
@router . get ( "/" )
async def hello_http ():
await router . broker . publish ( "Hello, Rabbit!" , "test" )
return "Hello, HTTP!"
app . include_router ( router )
1
2
3
4
5
6
7
8
9
10
11
12
13 from fastapi import FastAPI
from propan.fastapi import KafkaRouter
router = KafkaRouter ( "localhost:9092" )
app = FastAPI ( lifespan = router . lifespan_context )
@router . get ( "/" )
async def hello_http ():
await router . broker . publish ( "Hello, Kafka!" , "test" )
return "Hello, HTTP!"
app . include_router ( router )
1
2
3
4
5
6
7
8
9
10
11
12
13 from fastapi import FastAPI
from propan.fastapi import SQSRouter
router = SQSRouter ( "http://localhost:9324" )
app = FastAPI ( lifespan = router . lifespan_context )
@router . get ( "/" )
async def hello_http ():
await router . broker . publish ( "Hello, SQS!" , "test" )
return "Hello, HTTP!"
app . include_router ( router )
1
2
3
4
5
6
7
8
9
10
11
12
13 from fastapi import FastAPI
from propan.fastapi import NatsRouter
router = NatsRouter ( "nats://localhost:4222" )
app = FastAPI ( lifespan = router . lifespan_context )
@router . get ( "/" )
async def hello_http ():
await router . broker . publish ( "Hello, Nats!" , "test" )
return "Hello, HTTP!"
app . include_router ( router )
Вы можете оформить доступ к брокеру в виде Depends
, если хотите использовать его в разных частях вашей программы.
Redis RabbitMQ Kafka SQS NATS
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 from fastapi import FastAPI , Depends
from propan import RedisBroker
from propan.fastapi import RedisRouter
from typing_extensions import Annotated
router = RedisRouter ( "redis://localhost:6379" )
app = FastAPI ( lifespan = router . lifespan_context )
def broker ():
return router . broker
@router . get ( "/" )
async def hello_http ( broker : Annotated [ RedisBroker , Depends ( broker )]):
await broker . publish ( "Hello, Redis!" , "test" )
return "Hello, HTTP!"
app . include_router ( router )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 from fastapi import FastAPI , Depends
from propan import RabbitBroker
from propan.fastapi import RabbitRouter
from typing_extensions import Annotated
router = RabbitRouter ( "amqp://guest:guest@localhost:5672" )
app = FastAPI ( lifespan = router . lifespan_context )
def broker ():
return router . broker
@router . get ( "/" )
async def hello_http ( broker : Annotated [ RabbitBroker , Depends ( broker )]):
await broker . publish ( "Hello, Rabbit!" , "test" )
return "Hello, HTTP!"
app . include_router ( router )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 from fastapi import FastAPI , Depends
from propan import KafkaBroker
from propan.fastapi import KafkaRouter
from typing_extensions import Annotated
router = KafkaRouter ( "localhost:9092" )
app = FastAPI ( lifespan = router . lifespan_context )
def broker ():
return router . broker
@router . get ( "/" )
async def hello_http ( broker : Annotated [ KafkaBroker , Depends ( broker )]):
await broker . publish ( "Hello, Kafka!" , "test" )
return "Hello, HTTP!"
app . include_router ( router )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 from fastapi import FastAPI , Depends
from propan import SQSBroker
from propan.fastapi import SQSRouter
from typing_extensions import Annotated
router = SQSRouter ( "http://localhost:9324" )
app = FastAPI ( lifespan = router . lifespan_context )
def broker ():
return router . broker
@router . get ( "/" )
async def hello_http ( broker : Annotated [ SQSBroker , Depends ( broker )]):
await broker . publish ( "Hello, SQS!" , "test" )
return "Hello, HTTP!"
app . include_router ( router )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 from fastapi import FastAPI , Depends
from propan import NatsBroker
from propan.fastapi import NatsRouter
from typing_extensions import Annotated
router = NatsRouter ( "nats://localhost:4222" )
app = FastAPI ( lifespan = router . lifespan_context )
def broker ():
return router . broker
@router . get ( "/" )
async def hello_http ( broker : Annotated [ NatsBroker , Depends ( broker )]):
await broker . publish ( "Hello, Nats!" , "test" )
return "Hello, HTTP!"
app . include_router ( router )
Либо вы можете получить доступ к брокеру из контекста приложения FastAPI
@app . get ( "/" )
def main ( request : Request ):
broker = request . state . broker
@after_startup
Приложение PropanApp
имеет хук after_startup
, который позволяет вам осуществлять операции с вашим брокером сообщений после того, как соединение с ним будет установлено. Это может быть крайне удобно для управление объектами вашего брокера и/или отправки сообщений. Этот хук также доступен и для ваших FastAPI PropanRouter
Tip
Если ваш хук возвращает dict
значение, оно также будет включено в глобальный контекст приложения FastAPI (аналогично yield в lifespan )
Документация
При использовании Propan в качестве роутера для FastAPI , фреймворк автоматически регистрирует эндпоинты для хостинга AsyncAPI документации в ваше приложение со следующими значениями по умолчанию:
from propan.fastapi import RabbitRouter
router = RabbitRouter (
schema_url = "/asyncapi" ,
include_in_schema = True ,
)