Skip to content

Message Publishing

Propan uses a unified method to send messages

await broker.publish(message, ...)

This method, regardless of the broker, takes message as the first argument. However, the rest of the arguments are specific to different brokers.

You can get acquainted with all the features specific to your broker here:

Valid types to submit

Type Send header Method of casting to bytes
dict application/json json.dumps(message).encode()
Sequence application/json json.dumps(message).encode()
pydantic.BaseModel application/json message.json().encode()
str text/plain message.encode()
bytes message

Also, some brokers support sending special types, which are described in the relevant section of your broker's documentation.

Broker initialization

To send a message to a queue, you must first connect to it.

If you are inside a running Propan application, you don't need to do anything: the broker is already running. Just access it and send a message.

1
2
3
4
5
6
7
8
from propan import PropanApp, RedisBroker

broker = RedisBroker("redis://localhost:6379")
app = PropanApp(broker)

@broker.handle("test")
async def handle(m: str):
    await broker.publish(m, "another-channel")
1
2
3
4
5
6
7
8
from propan import PropanApp, RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = PropanApp(broker)

@broker.handle("test")
async def handle(m: str):
    await broker.publish(m, "another-queue")
1
2
3
4
5
6
7
8
from propan import PropanApp, KafkaBroker

broker = KafkaBroker("localhost:9092")
app = PropanApp(broker)

@broker.handle("test")
async def handle(m: str):
    await broker.publish(m, "another-topic")
1
2
3
4
5
6
7
8
from propan import PropanApp, SQSBroker

broker = SQSBroker("http://localhost:9324", ...)
app = PropanApp(broker)

@broker.handle("test")
async def handle(m: str):
    await broker.publish(m, "another-queue")
1
2
3
4
5
6
7
8
from propan import PropanApp, NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = PropanApp(broker)

@broker.handle("test")
async def handle(m: str):
    await broker.publish(m, "another-subject")

If you are only using Propan to send asynchronous messages within another framework, you can use broker as context manager to send.

async with RedisBroker("redis://localhost:6379") as broker:
    await broker.publish(m, "another-channel")
async with RabbitBroker("amqp://guest:guest@localhost:5672/") as broker:
    await broker.publish(m, "another-queue")
async with KafkaBroker("localhost:9092") as broker:
    await broker.publish(m, "another-topic")
async with SQSBroker("http://localhost:9324", ...) as broker:
    await broker.publish(m, "another-queue")
async with NatsBroker("nats://localhost:4222") as broker:
    await broker.publish(m, "another-subject")

Tips

Within this context, you can send an unlimited number of messages, as well as synchronously wait for a response to them. However, handle cannot be initialized within this context: they will complete their execution along with the context.

This will be discribed in more detail in the next section.