Skip to content

LIFESPANS

Иногда вам нужно определить логику, которая должна исполняться перед запуском приложения. Это означает, что код будет исполнен один раз - еще до того, как ваше приложение начнет принимать сообщения.

Также, у вас может возникнуть необходимость завершить некоторые процессы после остановки приложения. В этом случае, ваш код также будет выполнен ровно один раз: но уже после завершения работы основного приложения.

Поскольку этот код исполняется перед запуском приложения и после его остановки, он покрывает весь жизненный цикл (lifespan) приложения.

Это может быть очень полезно для инициализации настроек вашего приложения при старте, поднятия пула соединений к базе данных или запуска моделей машинного обучения.

Пример использования

Давайте представим, что ваше приложение использует pydantic в качестве менеджера ваших настроек.

Я крайне рекомендую использовать pydantic для этих целей, т.к. эта зависимость уже используется в Propan и вам не придется устанавливать дополнительный пакет

Также, давайте представим, что у вас есть несколько .env, .env.development, .env.test, .env.production файлов с настройками вашего приложения, и вы хотите переключать их при запуске без изменений в коде.

За счет передачи аргументов командной строки в ваш код Propan позволяет вам с легкостью это сделать.

Lifespan

Давайте напишем немного кода для нашего примера

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from propan import PropanApp, SQSBroker
from propan.annotations import ContextRepo
from pydantic_settings import BaseSettings

broker = SQSBroker()
app = PropanApp(broker)

class Settings(BaseSettings):
    host: str = "http://localhost:9324"

@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from propan import PropanApp, RabbitBroker
from propan.annotations import ContextRepo
from pydantic_settings import BaseSettings

broker = RabbitBroker()
app = PropanApp(broker)

class Settings(BaseSettings):
    host: str = "amqp://guest:guest@localhost:5672/"

@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from propan import PropanApp, KafkaBroker
from propan.annotations import ContextRepo
from pydantic_settings import BaseSettings

broker = KafkaBroker()
app = PropanApp(broker)

class Settings(BaseSettings):
    host: str = "localhost:9092"

@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from propan import PropanApp, RedisBroker
from propan.annotations import ContextRepo
from pydantic_settings import BaseSettings

broker = RedisBroker()
app = PropanApp(broker)

class Settings(BaseSettings):
    host: str = "redis://localhost:6379"

@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from propan import PropanApp, NatsBroker
from propan.annotations import ContextRepo
from pydantic_settings import BaseSettings

broker = NatsBroker()
app = PropanApp(broker)

class Settings(BaseSettings):
    host: str = "nats://localhost:4222"

@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)

Теперь это приложение можно запускать с помощью следующей команды для управления окружением:

propan run serve:app --env .env.test

Детали

Теперь разберемся немного детальнее

Для начала, мы использовали декоратор

12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)

для объявления функции, которая должна запускаться при старте нашего приложения

Следующим шагом мы объявили аргументы, которые будет получать наша функция

12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)

При этом поле env будет передано в функцию setup из аргументов командой строки

Tip

Функции жизненного цикла по умолчанию используются с декоратором @apply_types, поэтому в них доступны все поля контекста и зависимости

Затем, мы инициализировали настройки нашего приложения с использованием переданного нам из командой строки файла

12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)

И поместили эти настройки в глобальный контекст

12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
Note

Теперь мы можем получить доступ к нашим настройкам в любом месте приложения прямо из контекста

from propan import Context, apply_types
@apply_types
async def func(settings = Context()): ...

Последним шагом мы инициализировали нашего брокера: теперь, при старте приложения он будет готов принимать сообщения

12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)

Другой пример

Теперь давайте представим, что у нас есть модель машинного обучения, которая должна обрабатывать сообщения из какого-либо брокера.

Обычна инициализация таких моделей занимает продолжительное время. Разумно будет сделать это при старте приложения, а не при обработке каждого сообщения.

Вы можете инициализировать вашу модель где-то вверху вашего модуля/файла. Однако, в таком случае, этот код будет запущен даже просто в случае импортирования этого модуля, например, при тестировании. Вряд ли вы хотите запускать вашу модель на каждый запуск тестов...

Поэтому, стоит инициализированить модель в хуке @app.on_startup.

Также, мы не хотим, чтобы модель окончила свою работу при остановке приложения некорректно. Чтобы избежать этого, нам понадобиться хук @app.on_shutdown

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from propan import PropanApp, Context, RedisBroker
from propan.annotations import ContextRepo

broker = RedisBroker("redis://localhost:6379")
app = PropanApp(broker)

ml_models = {}  # fake ML model

def fake_answer_to_everything_ml_model(x: float):
    return x * 42

@app.on_startup
async def setup_model(context: ContextRepo):
    # Load the ML model
    ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
    context.set_global("model", ml_models)

@app.on_shutdown
async def shutdown_model(model: dict = Context()):
    # Clean up the ML models and release the resources
    model.clear()

@broker.handle("test")
async def predict(x: float, model = Context()):
    result = model["answer_to_everything"](x)
    return {"result": result}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from propan import PropanApp, Context, RabbitBroker
from propan.annotations import ContextRepo

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = PropanApp(broker)

ml_models = {}  # fake ML model

def fake_answer_to_everything_ml_model(x: float):
    return x * 42

@app.on_startup
async def setup_model(context: ContextRepo):
    # Load the ML model
    ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
    context.set_global("model", ml_models)

@app.on_shutdown
async def shutdown_model(model: dict = Context()):
    # Clean up the ML models and release the resources
    model.clear()

@broker.handle("test")
async def predict(x: float, model = Context()):
    result = model["answer_to_everything"](x)
    return {"result": result}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from propan import PropanApp, Context, KafkaBroker
from propan.annotations import ContextRepo

broker = KafkaBroker("localhost:9092")
app = PropanApp(broker)

ml_models = {}  # fake ML model

def fake_answer_to_everything_ml_model(x: float):
    return x * 42

@app.on_startup
async def setup_model(context: ContextRepo):
    # Load the ML model
    ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
    context.set_global("model", ml_models)

@app.on_shutdown
async def shutdown_model(model: dict = Context()):
    # Clean up the ML models and release the resources
    model.clear()

@broker.handle("test")
async def predict(x: float, model = Context()):
    result = model["answer_to_everything"](x)
    return {"result": result}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from propan import PropanApp, Context, SQSBroker
from propan.annotations import ContextRepo

broker = SQSBroker("http://localhost:9324", ...)
app = PropanApp(broker)

ml_models = {}  # fake ML model

def fake_answer_to_everything_ml_model(x: float):
    return x * 42

@app.on_startup
async def setup_model(context: ContextRepo):
    # Load the ML model
    ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
    context.set_global("model", ml_models)

@app.on_shutdown
async def shutdown_model(model: dict = Context()):
    # Clean up the ML models and release the resources
    model.clear()

@broker.handle("test")
async def predict(x: float, model = Context()):
    result = model["answer_to_everything"](x)
    return {"result": result}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from propan import PropanApp, Context, NatsBroker
from propan.annotations import ContextRepo

broker = NatsBroker("nats://localhost:4222")
app = PropanApp(broker)

ml_models = {}  # fake ML model

def fake_answer_to_everything_ml_model(x: float):
    return x * 42

@app.on_startup
async def setup_model(context: ContextRepo):
    # Load the ML model
    ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
    context.set_global("model", ml_models)

@app.on_shutdown
async def shutdown_model(model: dict = Context()):
    # Clean up the ML models and release the resources
    model.clear()

@broker.handle("test")
async def predict(x: float, model = Context()):
    result = model["answer_to_everything"](x)
    return {"result": result}

Несколько хуков

Если вы хотите объявить несколько хуков жизненного цикла, они будут использоваться в порядке их регистрации:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from propan import PropanApp, Context
from propan.annotations import ContextRepo

app = PropanApp()

@app.on_startup
async def setup(context: ContextRepo):
    context.set_global("field", 1)

@app.on_startup
async def setup_later(field: int = Context()):
    assert field == 1

Еще немного деталей

Async или не async

В асинхронной версии приложения в качестве хуков могут использоваться как асинхронные, так и синхронные методы. В синхронной версии доступны только синхронные методы.

Аргументы командной строки

Аргументы командной строки доступно во всех @app.on_startup хуках. Для использования их в других частях приложения поместите их в ContextRepo.

Инициализация брокера

Хуки @app.on_startup вызываются ДО запуска брокера приложением. Хуки @app.after_shutdown запускаются ПОСЛЕ остановки брокера.

Если же вы хотите совершить какие-то действия ПОСЛЕ инициализации брокера: отправить сообщения, инициализировать объекты и т.д., вам стоит использовать хук @app.after_startup.