Тестирование
Для того, чтобы протестировать ваше приложение локально, либо в рамках CI пайплайна, вам хочется уменьшить количество внешних зависимостей.
Гораздо проще сразу запустить набор тестов, чем пытаться поднимать контейнер с вашим Брокером Сообщений в рамках CI пайплайна.
Также, отстуствие внешних зависимостей позволит избежать ложного падения тестов, которое может быть связано с ошибками передачи данных до брокера, либо слишком ранним обращением к нему (когда контейнер еще не готов принимать сообщения).
Note
Для запуска асинхронных тестов с помощью pytest , вам необходимо установить расширения.
Вы можете воспользоваться, например pytest-asyncio или anyio
Модификация брокера
С этой целью Propan позволяет модифицировать поведение вашего брокера так, чтобы он передавал сообщения "в памяти", не требуя запуска внешних зависимостей.
Допустим, у нас есть приложение со следующим содержанием:
Redis RabbitMQ Kafka SQS NATS
main.py 1
2
3
4
5
6
7
8
9
10
11
12 from propan import PropanApp , RedisBroker
broker = RedisBroker ()
@broker . handler ( "ping" )
async def healthcheck ( msg : str ) -> str :
if msg == "ping" :
return "pong"
else :
return "wrong"
app = PropanApp ( broker )
Для того, чтобы протестировать его без запуска Redis необходимо модифицировать брокера с помощью propan.test.TestRedisBroker
:
test_ping.py from propan.test import TestRedisBroker
from main import broker
async def test_publish ():
async with TestRedisBroker ( broker ) as test_broker :
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
main.py 1
2
3
4
5
6
7
8
9
10
11
12 from propan import PropanApp , RabbitBroker
broker = RabbitBroker ()
@broker . handler ( "ping" )
async def healthcheck ( msg : str ) -> str :
if msg == "ping" :
return "pong"
else :
return "wrong"
app = PropanApp ( broker )
Для того, чтобы протестировать его без запуска RabbitMQ необходимо модифицировать брокера с помощью propan.test.TestRabbitBroker
:
test_ping.py from propan.test import TestRabbitBroker
from main import broker
async def test_publish ():
async with TestRabbitBroker ( broker ) as test_broker :
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
main.py 1
2
3
4
5
6
7
8
9
10
11
12 from propan import PropanApp , KafkaBroker
broker = KafkaBroker ()
@broker . handler ( "ping" )
async def healthcheck ( msg : str ) -> str :
if msg == "ping" :
return "pong"
else :
return "wrong"
app = PropanApp ( broker )
Для того, чтобы протестировать его без запуска Kafka необходимо модифицировать брокера с помощью propan.test.TestKafkaBroker
:
test_ping.py from propan.test import TestKafkaBroker
from main import broker
async def test_publish ():
async with TestKafkaBroker ( broker ) as test_broker :
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
main.py 1
2
3
4
5
6
7
8
9
10
11
12 from propan import PropanApp , SQSBroker
broker = SQSBroker ()
@broker . handler ( "ping" )
async def healthcheck ( msg : str ) -> str :
if msg == "ping" :
return "pong"
else :
return "wrong"
app = PropanApp ( broker )
Для того, чтобы протестировать его без запуска ElasticMQ необходимо модифицировать брокера с помощью propan.test.TestSQSBroker
:
test_ping.py from propan.test import TestSQSBroker
from main import broker
async def test_publish ():
async with TestSQSBroker ( broker ) as test_broker :
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
main.py 1
2
3
4
5
6
7
8
9
10
11
12 from propan import PropanApp , NatsBroker
broker = NatsBroker ()
@broker . handler ( "ping" )
async def healthcheck ( msg : str ) -> str :
if msg == "ping" :
return "pong"
else :
return "wrong"
app = PropanApp ( broker )
Для того, чтобы протестировать его без запуска NATS необходимо модифицировать брокера с помощью propan.test.TestNatsBroker
:
test_ping.py from propan.test import TestNatsBroker
from main import broker
async def test_publish ():
async with TestNatsBroker ( broker ) as test_broker :
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
А затем мы делает RPC запрос для того, чтобы проверить результат выполнения:
Redis RabbitMQ Kafka SQS NATS
async with TestRedisBroker ( broker ) as test_broker :
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
async with TestRabbitBroker ( broker ) as test_broker :
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
async with TestKafkaBroker ( broker ) as test_broker :
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
async with TestSQSBroker ( broker ) as test_broker :
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
async with TestNatsBroker ( broker ) as test_broker :
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
Note
При использовании тестового брокера RPC запросы работают даже у брокеров, которые не поддерживают их в обычном режиме.
Использование фикстур
В больших приложений для переиспользования тестового брокера вы можете использовать фикстуру следующего содержания:
Redis RabbitMQ Kafka SQS NATS
test_broker.py 1
2
3
4
5
6
7
8
9
10
11
12
13 import pytest
from propan.test import TestRedisBroker
from main import broker
@pytest . fixture ()
async def test_broker ():
async with TestRedisBroker ( broker ) as b :
yield b
async def test_publish ( test_broker ):
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
test_broker.py 1
2
3
4
5
6
7
8
9
10
11
12
13 import pytest
from propan.test import TestRabbitBroker
from main import broker
@pytest . fixture ()
async def test_broker ():
async with TestRabbitBroker ( broker ) as b :
yield b
async def test_publish ( test_broker ):
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
test_broker.py 1
2
3
4
5
6
7
8
9
10
11
12
13 import pytest
from propan.test import TestKafkaBroker
from main import broker
@pytest . fixture ()
async def test_broker ():
async with TestKafkaBroker ( broker ) as b :
yield b
async def test_publish ( test_broker ):
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
test_broker.py 1
2
3
4
5
6
7
8
9
10
11
12
13 import pytest
from propan.test import TestSQSBroker
from main import broker
@pytest . fixture ()
def test_broker ():
async with TestSQSBroker ( broker ) as b :
yield b
async def test_publish ( test_broker ):
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
test_broker.py 1
2
3
4
5
6
7
8
9
10
11
12
13 import pytest
from propan.test import TestNatsBroker
from main import broker
@pytest . fixture ()
async def test_broker ():
async with TestNatsBroker ( broker ) as b :
yield b
async def test_publish ( test_broker ):
r = await test_broker . publish ( "ping" , "ping" , callback = True )
assert r == "pong"
Tip
Данный подход имеет существенный недостаток: ошибки, возникшие внутри обработчика, не могут быть захвачены внутри ваших тестов.
Например, следующий тест вернет None
, а внутри обработчика - возникнет pydantic.ValidationError
:
async def test_publish ( test_broker ):
r = await test_broker . publish (
{ "msg" : "ping" }, "ping" ,
callback = True , callback_timeout = 1
)
assert r == None
Также этот тест будет заблокировать на callback_timeout
(по умолчанию 30 секунд), что может может сильно раздражать, когда внутри разрабатываемого
обработчика возникают ошибки, а ваши тесты отваливаются по длительному таймауту с None
.
Прямой вызов функций
Propan предоставляет возможность вызывать функции-обработчики напрямую: так, как если бы это были обычные функции.
Для этого вам нужно сконструироваться сообщение с помощью метода build_message
так, если бы это был publish
(сигнатуры методов совпадают), а затем
передать это сообщение в ваш обработчик в качестве единственного аргумента функции.
При этом, если вы хотите, чтобы захватывать исключения обработчика, вам нужно использовать флаг reraise_exc=True
при вызове:
Redis RabbitMQ Kafka SQS NATS
test_ping.py import pytest
import pydantic
from propan.test.redis import build_message
from main import healthcheck
async def test_publish ( test_broker ):
msg = build_message ({ "msg" : "ping" }, "ping" )
with pytest . raises ( pydantic . ValidationError ):
await healthcheck ( msg , reraise_exc = True )
test_ping.py import pytest
import pydantic
from propan.test.rabbit import build_message
from main import healthcheck
async def test_publish ( test_broker ):
msg = build_message ({ "msg" : "ping" }, "ping" )
with pytest . raises ( pydantic . ValidationError ):
await healthcheck ( msg , reraise_exc = True )
test_ping.py import pytest
import pydantic
from propan.test.kafka import build_message
from main import healthcheck
async def test_publish ( test_broker ):
msg = build_message ({ "msg" : "ping" }, "ping" )
with pytest . raises ( pydantic . ValidationError ):
await healthcheck ( msg , reraise_exc = True )
test_ping.py import pytest
import pydantic
from propan.test.sqs import build_message
from main import healthcheck
async def test_publish ( test_broker ):
msg = build_message ({ "msg" : "ping" }, "ping" )
with pytest . raises ( pydantic . ValidationError ):
await healthcheck ( msg , reraise_exc = True )
test_ping.py import pytest
import pydantic
from propan.test.nats import build_message
from main import healthcheck
async def test_publish ( test_broker ):
msg = build_message ({ "msg" : "ping" }, "ping" )
with pytest . raises ( pydantic . ValidationError ):
await healthcheck ( msg , reraise_exc = True )
Таким образом, Propan предоставляет вам полный инструментарий для тестирования ваших обработчиков: от валидации RPC ответов до корректно выполнения тела функций.