Skip to content

LIFESPANS

Sometimes you need to define the logic that should be executed before launching the application. This means that the code will be executed once - even before your application starts receiving messages.

Also, you may need to terminate some processes after stopping the application. In this case, your code will also be executed exactly once: but after the completion of the main application.

Since this code is executed before the application starts and after it stops, it covers the entire lifecycle (lifespan) of the application.

This can be very useful for initializing your application settings at startup, raising a pool of connections to a database, or running machine learning models.

Usage example

Let's imagine that your application uses pydantic as your settings manager.

I highly recommend using pydantic for these purposes, because this dependency is already used at Propan and you don't have to install an additional package

Also, let's imagine that you have several .env, .env.development, .env.test, .env.production files with your application settings, and you want to switch them at startup without any code changes.

By passing optional arguments with the command line to your code Propan allows you to do this easily.

Lifespan

Let's write some code for our example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from propan import PropanApp, SQSBroker
from propan.annotations import ContextRepo
from pydantic_settings import BaseSettings

broker = SQSBroker()
app = PropanApp(broker)

class Settings(BaseSettings):
    host: str = "http://localhost:9324"

@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from propan import PropanApp, RabbitBroker
from propan.annotations import ContextRepo
from pydantic_settings import BaseSettings

broker = RabbitBroker()
app = PropanApp(broker)

class Settings(BaseSettings):
    host: str = "amqp://guest:guest@localhost:5672/"

@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from propan import PropanApp, KafkaBroker
from propan.annotations import ContextRepo
from pydantic_settings import BaseSettings

broker = KafkaBroker()
app = PropanApp(broker)

class Settings(BaseSettings):
    host: str = "localhost:9092"

@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from propan import PropanApp, RedisBroker
from propan.annotations import ContextRepo
from pydantic_settings import BaseSettings

broker = RedisBroker()
app = PropanApp(broker)

class Settings(BaseSettings):
    host: str = "redis://localhost:6379"

@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from propan import PropanApp, NatsBroker
from propan.annotations import ContextRepo
from pydantic_settings import BaseSettings

broker = NatsBroker()
app = PropanApp(broker)

class Settings(BaseSettings):
    host: str = "nats://localhost:4222"

@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)

Now this application can be run using the following command to manage the environment:

propan run serve:app --env .env.test

Details

Now let's look into a little more detail

To begin with, we used a decorator

12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)

to declare a function that should run when our application starts

The next step is to declare the arguments that our function will receive

12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)

In this case, the env field will be passed to the setup function from the arguments with the command line

Tip

The default lifecycle functions are used with the decorator `@apply_types', therefore, all context fields and dependencies are available in them

Then, we initialized the settings of our application using the file passed to us from the command line

12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)

And put these settings in a global context

12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
Note

Now we can access our settings anywhere in the application right from the context

from propan import Context, apply_types
@apply_types
async def func(settings = Context()): ...

The last step we initialized our broker: now, when the application starts, it will be ready to receive messages

12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
12
13
14
15
16
@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)

Another example

Now let's imagine that we have a machine learning model that needs to process messages from some broker.

Initialization of such models usually takes a long time. It would be wise to do this at the start of the application, and not when processing each message.

You can initialize your model somewhere at the top of your module/file. However, in this case, this code will be run even just in case of importing this module, for example, during testing. It is unlikely that you want to run your model on every test run...

Therefore, it is worth initializing the model in the @app.on_startup hook.

Also, we don't want the model to finish its work incorrectly when the application is stopped. To avoid this, we need the hook @app.on_shutdown

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from propan import PropanApp, Context, RedisBroker
from propan.annotations import ContextRepo

broker = RedisBroker("redis://localhost:6379")
app = PropanApp(broker)

ml_models = {}  # fake ML model

def fake_answer_to_everything_ml_model(x: float):
    return x * 42

@app.on_startup
async def setup_model(context: ContextRepo):
    # Load the ML model
    ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
    context.set_global("model", ml_models)

@app.on_shutdown
async def shutdown_model(model: dict = Context()):
    # Clean up the ML models and release the resources
    model.clear()

@broker.handle("test")
async def predict(x: float, model = Context()):
    result = model["answer_to_everything"](x)
    return {"result": result}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from propan import PropanApp, Context, RabbitBroker
from propan.annotations import ContextRepo

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = PropanApp(broker)

ml_models = {}  # fake ML model

def fake_answer_to_everything_ml_model(x: float):
    return x * 42

@app.on_startup
async def setup_model(context: ContextRepo):
    # Load the ML model
    ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
    context.set_global("model", ml_models)

@app.on_shutdown
async def shutdown_model(model: dict = Context()):
    # Clean up the ML models and release the resources
    model.clear()

@broker.handle("test")
async def predict(x: float, model = Context()):
    result = model["answer_to_everything"](x)
    return {"result": result}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from propan import PropanApp, Context, KafkaBroker
from propan.annotations import ContextRepo

broker = KafkaBroker("localhost:9092")
app = PropanApp(broker)

ml_models = {}  # fake ML model

def fake_answer_to_everything_ml_model(x: float):
    return x * 42

@app.on_startup
async def setup_model(context: ContextRepo):
    # Load the ML model
    ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
    context.set_global("model", ml_models)

@app.on_shutdown
async def shutdown_model(model: dict = Context()):
    # Clean up the ML models and release the resources
    model.clear()

@broker.handle("test")
async def predict(x: float, model = Context()):
    result = model["answer_to_everything"](x)
    return {"result": result}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from propan import PropanApp, Context, SQSBroker
from propan.annotations import ContextRepo

broker = SQSBroker("http://localhost:9324", ...)
app = PropanApp(broker)

ml_models = {}  # fake ML model

def fake_answer_to_everything_ml_model(x: float):
    return x * 42

@app.on_startup
async def setup_model(context: ContextRepo):
    # Load the ML model
    ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
    context.set_global("model", ml_models)

@app.on_shutdown
async def shutdown_model(model: dict = Context()):
    # Clean up the ML models and release the resources
    model.clear()

@broker.handle("test")
async def predict(x: float, model = Context()):
    result = model["answer_to_everything"](x)
    return {"result": result}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from propan import PropanApp, Context, NatsBroker
from propan.annotations import ContextRepo

broker = NatsBroker("nats://localhost:4222")
app = PropanApp(broker)

ml_models = {}  # fake ML model

def fake_answer_to_everything_ml_model(x: float):
    return x * 42

@app.on_startup
async def setup_model(context: ContextRepo):
    # Load the ML model
    ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
    context.set_global("model", ml_models)

@app.on_shutdown
async def shutdown_model(model: dict = Context()):
    # Clean up the ML models and release the resources
    model.clear()

@broker.handle("test")
async def predict(x: float, model = Context()):
    result = model["answer_to_everything"](x)
    return {"result": result}

Multiple hooks

If you want to declare multiple lifecycle hooks, they will be used in the order they are registered:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from propan import PropanApp, Context
from propan.annotations import ContextRepo

app = PropanApp()

@app.on_startup
async def setup(context: ContextRepo):
    context.set_global("field", 1)

@app.on_startup
async def setup_later(field: int = Context()):
    assert field == 1

Some more details

Async or not async

In the asynchronous version of the application, both asynchronous and synchronous methods can be used as hooks. In the synchronous version, only synchronous methods are available.

Command line arguments

Command line arguments are available in all @app.on_startup hooks. To use them in other parts of the application, put them in the ContextRepo.

Broker initialization

The @app.on_startup hooks are called BEFORE the broker is launched by the application. The @app.after_shutdown hooks are triggered AFTER stopping the broker.

If you want to perform some actions AFTER initializing the broker: send messages, initialize objects, etc., you should use the @app.after_startup hook.