Отправка сообщений¶
В Propan для отправки сообщений используется унифицированный метод
await broker.publish(message, ...)
Этот метод, независимо от брокера, принимает первым аргументом message. Однако, остальные аргументы являются
специфичными для разных брокеров.
Ознакомиться со всеми особенностями, специфичными для вашего брокеры, вы можете здесь:
Допустимые типы для отправки¶
| Тип | Заголовок при отправке | Способ приведения к bytes |
|---|---|---|
dict |
application/json | json.dumps(message).encode() |
Sequence |
application/json | json.dumps(message).encode() |
pydantic.BaseModel |
application/json | message.json().encode() |
str |
text/plain | message.encode() |
bytes |
message |
Также, некоторые брокеры поддерживают отправку специальных типов, которые описаны в соотвествующем разделе документации вашего брокера.
Инициализация брокера перед отправкой¶
Для того, чтобы отправить сообщение в очерендь, необходимо сначала подключиться к ней.
Если вы находитесь внутри запущенного Propan приложения, вам не нужно ничего делать: брокер уже запущен. Просто получите к нему доступ и отправьте сообщение.
1 2 3 4 5 6 7 8 | |
1 2 3 4 5 6 7 8 | |
1 2 3 4 5 6 7 8 | |
1 2 3 4 5 6 7 8 | |
1 2 3 4 5 6 7 8 | |
Если же вы используете Propan только для отправки асинхронных сообщений в рамках другого фреймворка, вы можете использовать брокер в качестве контекстного менеджера для отправки.
async with RedisBroker("redis://localhost:6379") as broker:
await broker.publish(m, "another-channel")
async with RabbitBroker("amqp://guest:guest@localhost:5672/") as broker:
await broker.publish(m, "another-queue")
async with KafkaBroker("localhost:9092") as broker:
await broker.publish(m, "another-topic")
async with SQSBroker("http://localhost:9324", ...) as broker:
await broker.publish(m, "another-queue")
async with NatsBroker("nats://localhost:4222") as broker:
await broker.publish(m, "another-subject")
Tip
В рамках этого контекста вы можете отправлять неограниченное число сообщений, а также синхронно ожидать ответ на них.
Однако, инициализировать handle'ы в рамках этого контекста нельзя: они завершат свое выполнение вместе с контекстом.
Подробнее это будет разобрано в следующем разделе.