Skip to content

FastAPI Plugin

Прием сообщений

Propan может использоваться как полноценная часть FastAPI.

Для этого просто импортируйте нужный вам PropanRouter и объявите обработчик сообщений с помощью декоратора @event. Этот декоратор аналогичен декоратору @handle у соответсвующих брокеров.

Tip

При использовании таким образом Propan не использует собственную систему зависимостей, а интегрируется в FastAPI. Т.е. вы можете использовать Depends, BackgroundTasks и прочие инструменты FastAPI так, если бы это был обычный HTTP-endpoint.

Обратите внимание, что в коде ниже используется fastapi.Depends, а не propan.Depends.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from fastapi import Depends, FastAPI
from pydantic import BaseModel
from propan.fastapi import RedisRouter

router = RedisRouter("redis://localhost:6379")

app = FastAPI(lifespan=router.lifespan_context)

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.event("test")
async def hello(m: Incoming, d = Depends(call)) -> dict:
    return { "response": "Hello, Redis!" }

@router.get("/")
async def hello_http():
    return "Hello, HTTP!"

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from fastapi import Depends, FastAPI
from pydantic import BaseModel
from propan.fastapi import RabbitRouter

router = RabbitRouter("amqp://guest:guest@localhost:5672")

app = FastAPI(lifespan=router.lifespan_context)

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.event("test")
async def hello(m: Incoming, d = Depends(call)) -> dict:
    return { "response": "Hello, Rabbit!" }

@router.get("/")
async def hello_http():
    return "Hello, HTTP!"

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from fastapi import Depends, FastAPI
from pydantic import BaseModel
from propan.fastapi import KafkaRouter

router = KafkaRouter("localhost:9092")

app = FastAPI(lifespan=router.lifespan_context)

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.event("test")
async def hello(m: Incoming, d = Depends(call)) -> dict:
    return { "response": "Hello, Kafka!" }

@router.get("/")
async def hello_http():
    return "Hello, HTTP!"

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from fastapi import Depends, FastAPI
from pydantic import BaseModel
from propan.fastapi import SQSRouter

router = SQSRouter("http://localhost:9324")

app = FastAPI(lifespan=router.lifespan_context)

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.event("test")
async def hello(m: Incoming, d = Depends(call)) -> dict:
    return { "response": "Hello, SQS!" }

@router.get("/")
async def hello_http():
    return "Hello, HTTP!"

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from fastapi import Depends, FastAPI
from pydantic import BaseModel
from propan.fastapi import NatsRouter

router = NatsRouter("nats://localhost:4222")

app = FastAPI(lifespan=router.lifespan_context)

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.event("test")
async def hello(m: Incoming, d = Depends(call)) -> dict:
    return { "response": "Hello, Nats!" }

@router.get("/")
async def hello_http():
    return "Hello, HTTP!"

app.include_router(router)

При обработке сообщения из брокера все тело сообщения помещается одновременно и в body, и в path параметры запроса: вы можете достать получить к ним доступ любым удобным для вас способом. Заголовок сообщения помещается в headers.

Также этот роутер может полноценно использоваться как HttpRouter (наследником которого он и является). Поэтому вы можете объявлять с его помощью любые get, post, put и прочие HTTP методы. Как например, это сделано в строке 19.

Warning

Если ваш ASGI сервер не поддерживает установку state внутри lifespan, вы можете отключить это поведение следующим образом:

router = PropanRouter(..., setup_state=False)
Однако, после этого вы не сможете получить доступ к брокеру из state вашего приложения (но он все еще доступен как поле router.broker)

Отправка сообщений

Внутри каждого роутера есть соответсвующий брокер. Вы можете легко получить к нему доступ, если вам необходимо отправить сообщение в MQ.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from fastapi import FastAPI
from propan.fastapi import RedisRouter

router = RedisRouter("redis://localhost:6379")

app = FastAPI(lifespan=router.lifespan_context)

@router.get("/")
async def hello_http():
    await router.broker.publish("Hello, Redis!", "test")
    return "Hello, HTTP!"

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from fastapi import FastAPI
from propan.fastapi import RabbitRouter

router = RabbitRouter("amqp://guest:guest@localhost:5672")

app = FastAPI(lifespan=router.lifespan_context)

@router.get("/")
async def hello_http():
    await router.broker.publish("Hello, Rabbit!", "test")
    return "Hello, HTTP!"

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from fastapi import FastAPI
from propan.fastapi import KafkaRouter

router = KafkaRouter("localhost:9092")

app = FastAPI(lifespan=router.lifespan_context)

@router.get("/")
async def hello_http():
    await router.broker.publish("Hello, Kafka!", "test")
    return "Hello, HTTP!"

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from fastapi import FastAPI
from propan.fastapi import SQSRouter

router = SQSRouter("http://localhost:9324")

app = FastAPI(lifespan=router.lifespan_context)

@router.get("/")
async def hello_http():
    await router.broker.publish("Hello, SQS!", "test")
    return "Hello, HTTP!"

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from fastapi import FastAPI
from propan.fastapi import NatsRouter

router = NatsRouter("nats://localhost:4222")

app = FastAPI(lifespan=router.lifespan_context)

@router.get("/")
async def hello_http():
    await router.broker.publish("Hello, Nats!", "test")
    return "Hello, HTTP!"

app.include_router(router)

Вы можете оформить доступ к брокеру в виде Depends, если хотите использовать его в разных частях вашей программы.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from fastapi import FastAPI, Depends
from propan import RedisBroker
from propan.fastapi import RedisRouter
from typing_extensions import Annotated

router = RedisRouter("redis://localhost:6379")

app = FastAPI(lifespan=router.lifespan_context)

def broker():
    return router.broker

@router.get("/")
async def hello_http(broker: Annotated[RedisBroker, Depends(broker)]):
    await broker.publish("Hello, Redis!", "test")
    return "Hello, HTTP!"

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from fastapi import FastAPI, Depends
from propan import RabbitBroker
from propan.fastapi import RabbitRouter
from typing_extensions import Annotated

router = RabbitRouter("amqp://guest:guest@localhost:5672")

app = FastAPI(lifespan=router.lifespan_context)

def broker():
    return router.broker

@router.get("/")
async def hello_http(broker: Annotated[RabbitBroker, Depends(broker)]):
    await broker.publish("Hello, Rabbit!", "test")
    return "Hello, HTTP!"

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from fastapi import FastAPI, Depends
from propan import KafkaBroker
from propan.fastapi import KafkaRouter
from typing_extensions import Annotated

router = KafkaRouter("localhost:9092")

app = FastAPI(lifespan=router.lifespan_context)

def broker():
    return router.broker

@router.get("/")
async def hello_http(broker: Annotated[KafkaBroker, Depends(broker)]):
    await broker.publish("Hello, Kafka!", "test")
    return "Hello, HTTP!"

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from fastapi import FastAPI, Depends
from propan import SQSBroker
from propan.fastapi import SQSRouter
from typing_extensions import Annotated

router = SQSRouter("http://localhost:9324")

app = FastAPI(lifespan=router.lifespan_context)

def broker():
    return router.broker

@router.get("/")
async def hello_http(broker: Annotated[SQSBroker, Depends(broker)]):
    await broker.publish("Hello, SQS!", "test")
    return "Hello, HTTP!"

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from fastapi import FastAPI, Depends
from propan import NatsBroker
from propan.fastapi import NatsRouter
from typing_extensions import Annotated

router = NatsRouter("nats://localhost:4222")

app = FastAPI(lifespan=router.lifespan_context)

def broker():
    return router.broker

@router.get("/")
async def hello_http(broker: Annotated[NatsBroker, Depends(broker)]):
    await broker.publish("Hello, Nats!", "test")
    return "Hello, HTTP!"

app.include_router(router)

Либо вы можете получить доступ к брокеру из контекста приложения FastAPI

@app.get("/")
def main(request: Request):
    broker = request.state.broker

@after_startup

Приложение PropanApp имеет хук after_startup, который позволяет вам осуществлять операции с вашим брокером сообщений после того, как соединение с ним будет установлено. Это может быть крайне удобно для управление объектами вашего брокера и/или отправки сообщений. Этот хук также доступен и для ваших FastAPI PropanRouter

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from fastapi import FastAPI
from propan.fastapi import RedisRouter

router = RedisRouter("redis://localhost:6379")

app = FastAPI(lifespan=router.lifespan_context)

@router.after_startup
def do_smth(app: FastAPI):
    ...

@router.after_startup
async def publish_smth(app: FastAPI):
    await router.broker.publish(...)

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from fastapi import FastAPI
from propan.fastapi import RabbitRouter

router = RabbitRouter("amqp://guest:guest@localhost:5672")

app = FastAPI(lifespan=router.lifespan_context)

@router.after_startup
def do_smth(app: FastAPI):
    ...

@router.after_startup
async def publish_smth(app: FastAPI):
    await router.broker.publish(...)

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from fastapi import FastAPI
from propan.fastapi import KafkaRouter

router = KafkaRouter("localhost:9092")

app = FastAPI(lifespan=router.lifespan_context)

@router.after_startup
def do_smth(app: FastAPI):
    ...

@router.after_startup
async def publish_smth(app: FastAPI):
    await router.broker.publish(...)

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from fastapi import FastAPI
from propan.fastapi import SQSRouter

router = SQSRouter("http://localhost:9324")

app = FastAPI(lifespan=router.lifespan_context)

@router.after_startup
def do_smth(app: FastAPI):
    ...

@router.after_startup
async def publish_smth(app: FastAPI):
    await router.broker.publish(...)

app.include_router(router)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from fastapi import FastAPI
from propan.fastapi import NatsRouter

router = NatsRouter("nats://localhost:4222")

app = FastAPI(lifespan=router.lifespan_context)

@router.after_startup
def do_smth(app: FastAPI):
    ...

@router.after_startup
async def publish_smth(app: FastAPI):
    await router.broker.publish(...)

app.include_router(router)

Tip

Если ваш хук возвращает dict значение, оно также будет включено в глобальный контекст приложения FastAPI (аналогично yield в lifespan)

Документация

При использовании Propan в качестве роутера для FastAPI, фреймворк автоматически регистрирует эндпоинты для хостинга AsyncAPI документации в ваше приложение со следующими значениями по умолчанию:

1
2
3
4
5
6
from propan.fastapi import RabbitRouter

router = RabbitRouter(
    schema_url="/asyncapi",
    include_in_schema=True,
)