Skip to content

Testing

To test your app locally or in a CI pipeline, you want to reduce the number of external dependencies. This allows running a test suite more quickly than when a container with your Message Broker needs to be instantiated within the CI pipeline.

Also, the absence of dependencies helps to avoid test failures that are due to errors in transmitting data to the broker, or accessing the broker too early (when the container is not yet ready to receive connections).

Note

To run asynchronous tests using pytest, you need to install extensions.

You can use, for example, pytest-asyncio or anyio

Broker modification

Propan allows you to modify the behavior of your broker so that it passes messages "in memory" without requiring you to discover external dependencies.

Let's imagine we have an application like so:

main.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from propan import PropanApp, RedisBroker

broker = RedisBroker()

@broker.handler("ping")
async def healthcheck(msg: str) -> str:
    if msg == "ping":
        return "pong"
    else:
        return "wrong"

app = PropanApp(broker)

In order to test it without running Redis you need to modify the broker with propan.test.TestRedisBroker:

test_ping.py
1
2
3
4
5
6
7
8
from propan.test import TestRedisBroker

from main import broker

async def test_publish():
    async with TestRedisBroker(broker) as test_broker:
        r = await test_broker.publish("ping", "ping", callback=True)
    assert r == "pong"
main.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from propan import PropanApp, RabbitBroker

broker = RabbitBroker()

@broker.handler("ping")
async def healthcheck(msg: str) -> str:
    if msg == "ping":
        return "pong"
    else:
        return "wrong"

app = PropanApp(broker)

In order to test it without running RabbitMQ you need to modify the broker with propan.test.TestRabbitBroker:

test_ping.py
1
2
3
4
5
6
7
8
from propan.test import TestRabbitBroker

from main import broker

async def test_publish():
    async with TestRabbitBroker(broker) as test_broker:
        r = await test_broker.publish("ping", "ping", callback=True)
    assert r == "pong"
main.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from propan import PropanApp, KafkaBroker

broker = KafkaBroker()

@broker.handler("ping")
async def healthcheck(msg: str) -> str:
    if msg == "ping":
        return "pong"
    else:
        return "wrong"

app = PropanApp(broker)

In order to test it without running Kafka you need to modify the broker with propan.test.TestKafkaBroker:

test_ping.py
1
2
3
4
5
6
7
8
from propan.test import TestKafkaBroker

from main import broker

async def test_publish():
    async with TestKafkaBroker(broker) as test_broker:
        r = await test_broker.publish("ping", "ping", callback=True)
    assert r == "pong"
main.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from propan import PropanApp, SQSBroker

broker = SQSBroker()

@broker.handler("ping")
async def healthcheck(msg: str) -> str:
    if msg == "ping":
        return "pong"
    else:
        return "wrong"

app = PropanApp(broker)

In order to test it without running ElasticMQ you need to modify the broker with propan.test.TestSQSBroker:

test_ping.py
1
2
3
4
5
6
7
8
from propan.test import TestSQSBroker

from main import broker

async def test_publish():
    async with TestSQSBroker(broker) as test_broker:
        r = await test_broker.publish("ping", "ping", callback=True)
    assert r == "pong"
main.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from propan import PropanApp, NatsBroker

broker = NatsBroker()

@broker.handler("ping")
async def healthcheck(msg: str) -> str:
    if msg == "ping":
        return "pong"
    else:
        return "wrong"

app = PropanApp(broker)

In order to test it without running NATS you need to modify the broker with propan.test.TestNatsBroker:

test_ping.py
1
2
3
4
5
6
7
8
from propan.test import TestNatsBroker

from main import broker

async def test_publish():
    async with TestNatsBroker(broker) as test_broker:
        r = await test_broker.publish("ping", "ping", callback=True)
    assert r == "pong"

Then make an RPC request to check the result of the execution:

    async with TestRedisBroker(broker) as test_broker:
        r = await test_broker.publish("ping", "ping", callback=True)
    assert r == "pong"
    async with TestRabbitBroker(broker) as test_broker:
        r = await test_broker.publish("ping", "ping", callback=True)
    assert r == "pong"
    async with TestKafkaBroker(broker) as test_broker:
        r = await test_broker.publish("ping", "ping", callback=True)
    assert r == "pong"
    async with TestSQSBroker(broker) as test_broker:
        r = await test_broker.publish("ping", "ping", callback=True)
    assert r == "pong"
    async with TestNatsBroker(broker) as test_broker:
        r = await test_broker.publish("ping", "ping", callback=True)
    assert r == "pong"

Note

When using the test broker this way, it is always possible to perform RPC requests even if the broker doesn't support it in regular mode.

Using fixtures

For large applications to reuse the test broker, you can use the following fixture:

test_broker.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import pytest
from propan.test import TestRedisBroker

from main import broker

@pytest.fixture()
async def test_broker():
    async with TestRedisBroker(broker) as b:
        yield b

async def test_publish(test_broker):
    r = await test_broker.publish("ping", "ping", callback=True)
    assert r == "pong"
test_broker.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import pytest
from propan.test import TestRabbitBroker

from main import broker

@pytest.fixture()
async def test_broker():
    async with TestRabbitBroker(broker) as b:
        yield b

async def test_publish(test_broker):
    r = await test_broker.publish("ping", "ping", callback=True)
    assert r == "pong"
test_broker.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import pytest
from propan.test import TestKafkaBroker

from main import broker

@pytest.fixture()
async def test_broker():
    async with TestKafkaBroker(broker) as b:
        yield b

async def test_publish(test_broker):
    r = await test_broker.publish("ping", "ping", callback=True)
    assert r == "pong"
test_broker.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import pytest
from propan.test import TestSQSBroker

from main import broker

@pytest.fixture()
def test_broker():
    async with TestSQSBroker(broker) as b:
        yield b

async def test_publish(test_broker):
    r = await test_broker.publish("ping", "ping", callback=True)
    assert r == "pong"
test_broker.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import pytest
from propan.test import TestNatsBroker

from main import broker

@pytest.fixture()
async def test_broker():
    async with TestNatsBroker(broker) as b:
        yield b

async def test_publish(test_broker):
    r = await test_broker.publish("ping", "ping", callback=True)
    assert r == "pong"

Tip

This approach has a major weakness: Errors that raises inside handler cannot be captured inside your tests.

For example, the following test will return None and inside the handler, a pydantic.ValidationError will be raised:

async def test_publish(test_broker):
    r = await test_broker.publish(
        {"msg": "ping"}, "ping",
        callback=True, callback_timeout=1
    )
    assert r == None

Also this test will be blocked for callback_timeout (default 30 seconds), which can be very annoying when a handler development error occures, and your tests fail with a long timeout of None.

Regular function calling

Propan provides the ability to run handler functions as if they were regular functions.

To do this, you need to construct a message using the build_message, if it was publish (same method signatures), and passe this message to your handler as the single function argument.

test_ping.py
1
2
3
4
5
6
7
from propan.test.redis import build_message

from main import healthcheck

async def test_publish(test_broker):
    msg = build_message("ping", "ping")
    assert (await healthcheck(msg)) == "pong"
test_ping.py
1
2
3
4
5
6
7
from propan.test.rabbit import build_message

from main import healthcheck

async def test_publish(test_broker):
    msg = build_message("ping", "ping")
    assert (await healthcheck(msg)) == "pong"
test_ping.py
1
2
3
4
5
6
7
from propan.test.kafka import build_message

from main import healthcheck

async def test_publish(test_broker):
    msg = build_message("ping", "ping")
    assert (await healthcheck(msg)) == "pong"
test_ping.py
1
2
3
4
5
6
7
from propan.test.sqs import build_message

from main import healthcheck

async def test_publish(test_broker):
    msg = build_message("ping", "ping")
    assert (await healthcheck(msg)) == "pong"
test_ping.py
1
2
3
4
5
6
7
from propan.test.nats import build_message

from main import healthcheck

async def test_publish(test_broker):
    msg = build_message("ping", "ping")
    assert (await healthcheck(msg)) == "pong"

That being said, if you want to catch handler exceptions, you need to use the reraise_exc=True calling flag:

test_ping.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import pytest
import pydantic
from propan.test.redis import build_message

from main import healthcheck

async def test_publish(test_broker):
    msg = build_message({ "msg": "ping" }, "ping")
    with pytest.raises(pydantic.ValidationError):
        await healthcheck(msg, reraise_exc=True)
test_ping.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import pytest
import pydantic
from propan.test.rabbit import build_message

from main import healthcheck

async def test_publish(test_broker):
    msg = build_message({ "msg": "ping" }, "ping")
    with pytest.raises(pydantic.ValidationError):
        await healthcheck(msg, reraise_exc=True)
test_ping.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import pytest
import pydantic
from propan.test.kafka import build_message

from main import healthcheck

async def test_publish(test_broker):
    msg = build_message({ "msg": "ping" }, "ping")
    with pytest.raises(pydantic.ValidationError):
        await healthcheck(msg, reraise_exc=True)
test_ping.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import pytest
import pydantic
from propan.test.sqs import build_message

from main import healthcheck

async def test_publish(test_broker):
    msg = build_message({ "msg": "ping" }, "ping")
    with pytest.raises(pydantic.ValidationError):
        await healthcheck(msg, reraise_exc=True)
test_ping.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import pytest
import pydantic
from propan.test.nats import build_message

from main import healthcheck

async def test_publish(test_broker):
    msg = build_message({ "msg": "ping" }, "ping")
    with pytest.raises(pydantic.ValidationError):
        await healthcheck(msg, reraise_exc=True)

Thus, Propan provides you with a complete toolkit for testing your handlers, from checking RPC responses to correctly executing body functions.