Перейти к содержанию

RPC

Зачем использовать RPC over MQ

Иногда вам может понадобиться не просто отправить сообщение, но и получить на него ответ. Обычно для этого используют HTTP, однако у нас уже есть система доставки сообщений, почему бы нам не использовать ее?

RPC запросы поверх брокеров сообщений выполняются очень просто: мы отправляем сообщение в одну очередь, а ответ получаем в другую. Выглядит несколь топорно, однако такой подход несет в себе некоторые преимущества.

  1. Время между вопросом и ответом ничем не ограничено: мы можем отправить запрос, а ответ получить через сутки. HTTP запрос таких волностей нам не позволяет. Это может быть крайне полезно для сервисов, который выполняют продолжительную работу: обрабатывают файлы, запускают нейросети и т.д.
  2. Асинхронность: мы сами можем решать, ждать нам ответ прямо сейчас или просто отправить запрос, а обработать, когда он будет готов.
  3. Один запрос - много ответов: используя брокеры сообщений мы вполне можем получить множество ответов на один запрос. Например, запросом мы можем инициализировать канал связи, по которому обратно будут посылаться данные по мере готовности.

Реализация

Сервер

Со стороны сервера (принимающей стороны) вам не нужно никак изменять код: return вашей функции автоматически будет отправлен клиенту, если он ожидает ответ на ваше сообщение.

Note

Результат вашей функции должен соответствовать допустимым типам параметра message функции broker.publish.

Допустимые типы: str, dict, Sequence, pydantic.BaseModel, bytes, а также нативные сообщения используемой для брокера библиотеки.

1
2
3
4
5
6
7
from propan import RedisBroker

broker = RedisBroker("redis://localhost:6379")

@broker.handle("ping")
async def ping(m: str):
    return "pong!"  # <-- send RPC response
1
2
3
4
5
6
7
from propan import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@127.0.0.1/")

@broker.handle("ping")
async def ping(m: str):
    return "pong!"  # <-- send RPC response
1
2
3
4
5
6
7
from propan import NatsBroker

broker = NatsBroker("nats://localhost:4222")

@broker.handle("ping")
async def ping(m: str):
    return "pong!"  # <-- send RPC response

Клиент

Блокирующий запрос

Для того, чтобы дождаться результата выполнения запроса "прямо здесь" (как если бы это был HTTP запрос) необходимо просто указать параметер callback=True при отправке сообщения.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from propan import RedisBroker

async def main():
    async with RedisBroker("redis://localhost:6379") as broker:
        r = await broker.publish(
            "hi!", "ping",
            callback=True
        )

    assert r == "pong"  # <-- take the RPC response
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from propan import RabbitBroker

async def main():
    async with RabbitBroker("amqp://guest:guest@127.0.0.1/") as broker:
        r = await broker.publish(
            "hi!", "ping",
            callback=True
        )

    assert r == "pong"  # <-- take the RPC response
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from propan import NatsBroker

async def main():
    async with NatsBroker("nats://localhost:4222") as broker:
        r = await broker.publish(
            "hi!", "ping",
            callback=True
        )

    assert r == "pong"  # <-- take the RPC response

Для установки времени, которое клиент готов ожидать ответ от сервера, используйте параметр callback_timeout (по умолчанию - 30 секунд)

1
2
3
4
5
await broker.publish(
    "hi!", "ping",
    callback=True,
    callback_timeout=3.0  # (1)
)
  1. Ожидает результат выполнения 3 секунды

Если вы готовы ждать ответ столько, сколько это понадобится вы можете выставить callback_timeout=None

1
2
3
4
5
await broker.publish(
    "hi!", "ping",
    callback=True,
    callback_timeout=None
)

Warning

Этот код будет ожидать ответ бесконечно, даже если сервер не сможет обработать сообщение или обработка занимает длительное время

По умолчанию, если Propan не дождался ответа сервера, функция вернет None. Если же вы хотите явным образом обработать TimeoutError, используйте параметр raise_timeout.

1
2
3
4
5
await broker.publish(
    "hi!", "ping",
    callback=True,
    raise_timeout=True
)

Неблокирующий запрос

Для того, чтобы обработать ответ вне основного потока выполнения, вы можете просто инициализировать обработчик, а затем передать его адрес в качестве reply_to аргумента запроса.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
from propan import RedisBroker

broker = RedisBroker("redis://localhost:6379")

@broker.handle("reply")
async def get_message(m: str):
    assert m == "pong!"  # <-- take the RPC response

async def main():
    await broker.start()

    await broker.publish(
        "hello", "ping",
        reply_to="reply"
    )

    try:
        await asyncio.Future()
    finally:
        await broker.close()

asyncio.run(main())
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
from propan import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@127.0.0.1/")

@broker.handle("reply")
async def get_message(m: str):
    assert m == "pong!"  # <-- take the RPC response

async def main():
    await broker.start()

    await broker.publish(
        "hello", "ping",
        reply_to="reply"
    )

    try:
        await asyncio.Future()
    finally:
        await broker.close()

asyncio.run(main())
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
from propan import NatsBroker

broker = NatsBroker("nats://localhost:4222")

@broker.handle("reply")
async def get_message(m: str):
    assert m == "pong!"  # <-- take the RPC response

async def main():
    await broker.start()

    await broker.publish(
        "hello", "ping",
        reply_to="reply"
    )

    try:
        await asyncio.Future()
    finally:
        await broker.close()

asyncio.run(main())

Note

Обратите внимание, что для работы неблокирующих сообщений, broker должен быть запущен. Это значит, что мы не можем работать с такими сообщениями, используя broker как контекстный менеджер.