Testing
To test your app locally or in a CI pipeline, you want to reduce the number of external dependencies.
This allows running a test suite more quickly than when a container with your Message Broker needs to be instantiated within the CI pipeline.
Also, the absence of dependencies helps to avoid test failures that are due to errors in transmitting data to the broker, or accessing the broker too early (when the container is not yet ready to receive connections).
Note
To run asynchronous tests using pytest , you need to install extensions.
You can use, for example, pytest-asyncio or anyio
Broker modification
Propan allows you to modify the behavior of your broker so that it passes messages "in memory" without requiring you to discover external dependencies.
Let's imagine we have an application like so:
Redis RabbitMQ Kafka SQS NATS
main.py 1
2
3
4
5
6
7
8
9
10
11
12 from propan import PropanApp , RedisBroker
broker = RedisBroker ()
@broker . handler ( "ping" )
async def healthcheck ( msg : str ) -> str :
if msg == "ping" :
return "pong"
else :
return "wrong"
app = PropanApp ( broker )
In order to test it without running Redis you need to modify the broker with propan.test.TestRedisBroker
:
test_ping.py from propan.test import TestRedisBroker
from main import broker
async def test_publish ():
async with TestRedisBroker ( broker ) as test_broker :
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
main.py 1
2
3
4
5
6
7
8
9
10
11
12 from propan import PropanApp , RabbitBroker
broker = RabbitBroker ()
@broker . handler ( "ping" )
async def healthcheck ( msg : str ) -> str :
if msg == "ping" :
return "pong"
else :
return "wrong"
app = PropanApp ( broker )
In order to test it without running RabbitMQ you need to modify the broker with propan.test.TestRabbitBroker
:
test_ping.py from propan.test import TestRabbitBroker
from main import broker
async def test_publish ():
async with TestRabbitBroker ( broker ) as test_broker :
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
main.py 1
2
3
4
5
6
7
8
9
10
11
12 from propan import PropanApp , KafkaBroker
broker = KafkaBroker ()
@broker . handler ( "ping" )
async def healthcheck ( msg : str ) -> str :
if msg == "ping" :
return "pong"
else :
return "wrong"
app = PropanApp ( broker )
In order to test it without running Kafka you need to modify the broker with propan.test.TestKafkaBroker
:
test_ping.py from propan.test import TestKafkaBroker
from main import broker
async def test_publish ():
async with TestKafkaBroker ( broker ) as test_broker :
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
main.py 1
2
3
4
5
6
7
8
9
10
11
12 from propan import PropanApp , SQSBroker
broker = SQSBroker ()
@broker . handler ( "ping" )
async def healthcheck ( msg : str ) -> str :
if msg == "ping" :
return "pong"
else :
return "wrong"
app = PropanApp ( broker )
In order to test it without running ElasticMQ you need to modify the broker with propan.test.TestSQSBroker
:
test_ping.py from propan.test import TestSQSBroker
from main import broker
async def test_publish ():
async with TestSQSBroker ( broker ) as test_broker :
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
main.py 1
2
3
4
5
6
7
8
9
10
11
12 from propan import PropanApp , NatsBroker
broker = NatsBroker ()
@broker . handler ( "ping" )
async def healthcheck ( msg : str ) -> str :
if msg == "ping" :
return "pong"
else :
return "wrong"
app = PropanApp ( broker )
In order to test it without running NATS you need to modify the broker with propan.test.TestNatsBroker
:
test_ping.py from propan.test import TestNatsBroker
from main import broker
async def test_publish ():
async with TestNatsBroker ( broker ) as test_broker :
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
Then make an RPC request to check the result of the execution:
Redis RabbitMQ Kafka SQS NATS
async with TestRedisBroker ( broker ) as test_broker :
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
async with TestRabbitBroker ( broker ) as test_broker :
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
async with TestKafkaBroker ( broker ) as test_broker :
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
async with TestSQSBroker ( broker ) as test_broker :
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
async with TestNatsBroker ( broker ) as test_broker :
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
Note
When using the test broker this way, it is always possible to perform RPC requests even if the broker doesn't support it in regular mode.
Using fixtures
For large applications to reuse the test broker, you can use the following fixture:
Redis RabbitMQ Kafka SQS NATS
test_broker.py 1
2
3
4
5
6
7
8
9
10
11
12
13 import pytest
from propan.test import TestRedisBroker
from main import broker
@pytest . fixture ()
async def test_broker ():
async with TestRedisBroker ( broker ) as b :
yield b
async def test_publish ( test_broker ):
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
test_broker.py 1
2
3
4
5
6
7
8
9
10
11
12
13 import pytest
from propan.test import TestRabbitBroker
from main import broker
@pytest . fixture ()
async def test_broker ():
async with TestRabbitBroker ( broker ) as b :
yield b
async def test_publish ( test_broker ):
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
test_broker.py 1
2
3
4
5
6
7
8
9
10
11
12
13 import pytest
from propan.test import TestKafkaBroker
from main import broker
@pytest . fixture ()
async def test_broker ():
async with TestKafkaBroker ( broker ) as b :
yield b
async def test_publish ( test_broker ):
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
test_broker.py 1
2
3
4
5
6
7
8
9
10
11
12
13 import pytest
from propan.test import TestSQSBroker
from main import broker
@pytest . fixture ()
def test_broker ():
async with TestSQSBroker ( broker ) as b :
yield b
async def test_publish ( test_broker ):
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
test_broker.py 1
2
3
4
5
6
7
8
9
10
11
12
13 import pytest
from propan.test import TestNatsBroker
from main import broker
@pytest . fixture ()
async def test_broker ():
async with TestNatsBroker ( broker ) as b :
yield b
async def test_publish ( test_broker ):
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
Tip
This approach has a major weakness: Errors that raises inside handler cannot be captured inside your tests.
For example, the following test will return None
and inside the handler, a pydantic.ValidationError
will be raised:
async def test_publish ( test_broker ):
r = await test_broker . publish (
{ "msg" : "ping" }, "ping" ,
callback = True , callback_timeout = 1
)
assert r == None
Also this test will be blocked for callback_timeout
(default 30 seconds), which can be very annoying when a handler development error occures, and your tests fail with a long timeout of None
.
Regular function calling
Propan provides the ability to run handler functions as if they were regular functions.
To do this, you need to construct a message using the build_message
, if it was publish
(same method signatures), and passe this message to your handler as the single function argument.
That being said, if you want to catch handler exceptions, you need to use the reraise_exc=True
calling flag:
Redis RabbitMQ Kafka SQS NATS
test_ping.py import pytest
import pydantic
from propan.test.redis import build_message
from main import healthcheck
async def test_publish ( test_broker ):
msg = build_message ({ "msg" : "ping" }, "ping" )
with pytest . raises ( pydantic . ValidationError ):
await healthcheck ( msg , reraise_exc = True )
test_ping.py import pytest
import pydantic
from propan.test.rabbit import build_message
from main import healthcheck
async def test_publish ( test_broker ):
msg = build_message ({ "msg" : "ping" }, "ping" )
with pytest . raises ( pydantic . ValidationError ):
await healthcheck ( msg , reraise_exc = True )
test_ping.py import pytest
import pydantic
from propan.test.kafka import build_message
from main import healthcheck
async def test_publish ( test_broker ):
msg = build_message ({ "msg" : "ping" }, "ping" )
with pytest . raises ( pydantic . ValidationError ):
await healthcheck ( msg , reraise_exc = True )
test_ping.py import pytest
import pydantic
from propan.test.sqs import build_message
from main import healthcheck
async def test_publish ( test_broker ):
msg = build_message ({ "msg" : "ping" }, "ping" )
with pytest . raises ( pydantic . ValidationError ):
await healthcheck ( msg , reraise_exc = True )
test_ping.py import pytest
import pydantic
from propan.test.nats import build_message
from main import healthcheck
async def test_publish ( test_broker ):
msg = build_message ({ "msg" : "ping" }, "ping" )
with pytest . raises ( pydantic . ValidationError ):
await healthcheck ( msg , reraise_exc = True )
Thus, Propan provides you with a complete toolkit for testing your handlers, from checking RPC responses to correctly executing body functions.