RPC¶
Зачем использовать RPC over MQ¶
Иногда вам может понадобиться не просто отправить сообщение, но и получить на него ответ. Обычно для этого используют HTTP, однако у нас уже есть система доставки сообщений, почему бы нам не использовать ее?
RPC запросы поверх брокеров сообщений выполняются очень просто: мы отправляем сообщение в одну очередь, а ответ получаем в другую. Выглядит несколь топорно, однако такой подход несет в себе некоторые преимущества.
- Время между вопросом и ответом ничем не ограничено: мы можем отправить запрос, а ответ получить через сутки. HTTP запрос таких волностей нам не позволяет. Это может быть крайне полезно для сервисов, который выполняют продолжительную работу: обрабатывают файлы, запускают нейросети и т.д.
- Асинхронность: мы сами можем решать, ждать нам ответ прямо сейчас или просто отправить запрос, а обработать, когда он будет готов.
- Один запрос - много ответов: используя брокеры сообщений мы вполне можем получить множество ответов на один запрос. Например, запросом мы можем инициализировать канал связи, по которому обратно будут посылаться данные по мере готовности.
Реализация¶
Сервер¶
Со стороны сервера (принимающей стороны) вам не нужно никак изменять код: return
вашей функции автоматически будет отправлен клиенту, если он ожидает ответ на ваше сообщение.
Note
Результат вашей функции должен соответствовать допустимым типам параметра message
функции broker.publish
.
Допустимые типы: str
, dict
, Sequence
, pydantic.BaseModel
, bytes
, а также нативные сообщения используемой для брокера библиотеки.
1 2 3 4 5 6 7 |
|
1 2 3 4 5 6 7 |
|
1 2 3 4 5 6 7 |
|
Клиент¶
Блокирующий запрос¶
Для того, чтобы дождаться результата выполнения запроса "прямо здесь" (как если бы это был HTTP запрос) необходимо просто указать
параметер callback=True
при отправке сообщения.
1 2 3 4 5 6 7 8 9 10 |
|
1 2 3 4 5 6 7 8 9 10 |
|
1 2 3 4 5 6 7 8 9 10 |
|
Для установки времени, которое клиент готов ожидать ответ от сервера, используйте параметр callback_timeout
(по умолчанию - 30 секунд)
1 2 3 4 5 |
|
- Ожидает результат выполнения 3 секунды
Если вы готовы ждать ответ столько, сколько это понадобится вы можете выставить callback_timeout=None
1 2 3 4 5 |
|
Warning
Этот код будет ожидать ответ бесконечно, даже если сервер не сможет обработать сообщение или обработка занимает длительное время
По умолчанию, если Propan не дождался ответа сервера, функция вернет None
. Если же вы хотите явным образом обработать TimeoutError
,
используйте параметр raise_timeout
.
1 2 3 4 5 |
|
Неблокирующий запрос¶
Для того, чтобы обработать ответ вне основного потока выполнения, вы можете просто инициализировать обработчик, а затем передать его адрес в качестве reply_to
аргумента запроса.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
|
Note
Обратите внимание, что для работы неблокирующих сообщений, broker
должен быть запущен. Это значит, что мы не можем
работать с такими сообщениями, используя broker
как контекстный менеджер.