Skip to content

RPC

Reasons to use RPC over MQ

Sometimes you may need to not just send a message but also get a response to it. HTTP is usually used for this, but we already have a message delivery system, so why don't we use it?

RPC requests on top of message brokers are executed very simply: we send a message to one queue, and receive a response from another. It looks a bit awkward, but this strategy has some advantages.

  1. The time between a request and a response is unlimited: we can send a request, and receive a response in a day. The HTTP request does not allow us such do that. This can be extremely useful for services that perform long-term work: process files, run neural networks, etc.
  2. Asynchrony: we can decide for ourselves whether to wait for an answer right now or just send a request and process it when it is ready.
  3. One request - many responses: Using message brokers, we may get many responses to a single request. For example, with a request, we can initialize a communication channel through which data will be sent back as soon as it is ready.

Implementation

Server

From the server side (the receiving side), you do not need to change the code: return of your function will be automatically sent to the client if he is waiting for a response to your message.

Note

The result of your function must match the valid types of the message parameter of the broker.publish function.

Acceptable types are str, dict, Sequence, pydantic.BaseModel, bytes and a native message of the library used for the broker.

1
2
3
4
5
6
7
from propan import RedisBroker

broker = RedisBroker("redis://localhost:6379")

@broker.handle("ping")
async def ping(m: str):
    return "pong!"  # <-- send RPC response
1
2
3
4
5
6
7
from propan import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@127.0.0.1/")

@broker.handle("ping")
async def ping(m: str):
    return "pong!"  # <-- send RPC response
1
2
3
4
5
6
7
from propan import NatsBroker

broker = NatsBroker("nats://localhost:4222")

@broker.handle("ping")
async def ping(m: str):
    return "pong!"  # <-- send RPC response

Client

Blocking request

To wait for the result of executing the request "right here" (as if it were an HTTP request), you just need to specify the parameter callback=True when sending the message.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from propan import RedisBroker

async def main():
    async with RedisBroker("redis://localhost:6379") as broker:
        r = await broker.publish(
            "hi!", "ping",
            callback=True
        )

    assert r == "pong"  # <-- take the RPC response
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from propan import RabbitBroker

async def main():
    async with RabbitBroker("amqp://guest:guest@127.0.0.1/") as broker:
        r = await broker.publish(
            "hi!", "ping",
            callback=True
        )

    assert r == "pong"  # <-- take the RPC response
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from propan import NatsBroker

async def main():
    async with NatsBroker("nats://localhost:4222") as broker:
        r = await broker.publish(
            "hi!", "ping",
            callback=True
        )

    assert r == "pong"  # <-- take the RPC response

To set the time that the client is ready to wait for a response from the server, use thecallback_timeout parameter (by default - 30 seconds)

1
2
3
4
5
await broker.publish(
    "hi!", "ping",
    callback=True,
    callback_timeout=3.0  # (1)
)
  1. Waits for result for 3 seconds

If you are ready to wait for a response as long as it takes, you can set callback_timeout=None

1
2
3
4
5
await broker.publish(
    "hi!", "ping",
    callback=True,
    callback_timeout=None
)

Warning

This code will wait for a response indefinitely, even if the server is unable to process the message or processing takes a long time.

By default, if Propan did not wait for the server response, the function will return None. If you want to explicitly process TimeoutError, use the raise_timeout parameter.

1
2
3
4
5
await broker.publish(
    "hi!", "ping",
    callback=True,
    raise_timeout=True
)

Non-blocking request

To process the response outside of the main execution loop, you can initialize a handler and then pass its queue as the reply_to argument of the request.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
from propan import RedisBroker

broker = RedisBroker("redis://localhost:6379")

@broker.handle("reply")
async def get_message(m: str):
    assert m == "pong!"  # <-- take the RPC response

async def main():
    await broker.start()

    await broker.publish(
        "hello", "ping",
        reply_to="reply"
    )

    try:
        await asyncio.Future()
    finally:
        await broker.close()

asyncio.run(main())
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
from propan import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@127.0.0.1/")

@broker.handle("reply")
async def get_message(m: str):
    assert m == "pong!"  # <-- take the RPC response

async def main():
    await broker.start()

    await broker.publish(
        "hello", "ping",
        reply_to="reply"
    )

    try:
        await asyncio.Future()
    finally:
        await broker.close()

asyncio.run(main())
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
from propan import NatsBroker

broker = NatsBroker("nats://localhost:4222")

@broker.handle("reply")
async def get_message(m: str):
    assert m == "pong!"  # <-- take the RPC response

async def main():
    await broker.start()

    await broker.publish(
        "hello", "ping",
        reply_to="reply"
    )

    try:
        await asyncio.Future()
    finally:
        await broker.close()

asyncio.run(main())

Note

Note that the broker must be running to consume non-blocking messages. This means we cannot work with non-blocking RPC messages using broker as a context manager.