Message Publishing¶
Propan uses a unified method to send messages
await broker.publish(message, ...)
This method, regardless of the broker, takes message
as the first argument. However, the rest of the arguments are
specific to different brokers.
You can get acquainted with all the features specific to your broker here:
Valid types to submit¶
Type | Send header | Method of casting to bytes |
---|---|---|
dict |
application/json | json.dumps(message).encode() |
Sequence |
application/json | json.dumps(message).encode() |
pydantic.BaseModel |
application/json | message.json().encode() |
str |
text/plain | message.encode() |
bytes |
message |
Also, some brokers support sending special types, which are described in the relevant section of your broker's documentation.
Broker initialization¶
To send a message to a queue, you must first connect to it.
If you are inside a running Propan application, you don't need to do anything: the broker is already running. Just access it and send a message.
1 2 3 4 5 6 7 8 |
|
1 2 3 4 5 6 7 8 |
|
1 2 3 4 5 6 7 8 |
|
1 2 3 4 5 6 7 8 |
|
1 2 3 4 5 6 7 8 |
|
If you are only using Propan to send asynchronous messages within another framework, you can use broker as context manager to send.
async with RedisBroker("redis://localhost:6379") as broker:
await broker.publish(m, "another-channel")
async with RabbitBroker("amqp://guest:guest@localhost:5672/") as broker:
await broker.publish(m, "another-queue")
async with KafkaBroker("localhost:9092") as broker:
await broker.publish(m, "another-topic")
async with SQSBroker("http://localhost:9324", ...) as broker:
await broker.publish(m, "another-queue")
async with NatsBroker("nats://localhost:4222") as broker:
await broker.publish(m, "another-subject")
Tips
Within this context, you can send an unlimited number of messages, as well as synchronously wait for a response to them.
However, handle
cannot be initialized within this context: they will complete their execution along with the context.
This will be discribed in more detail in the next section.