Перейти к содержанию

Отправка сообщений

В Propan для отправки сообщений используется унифицированный метод

await broker.publish(message, ...)

Этот метод, независимо от брокера, принимает первым аргументом message. Однако, остальные аргументы являются специфичными для разных брокеров.

Ознакомиться со всеми особенностями, специфичными для вашего брокеры, вы можете здесь:

Допустимые типы для отправки

Тип Заголовок при отправке Способ приведения к bytes
dict application/json json.dumps(message).encode()
Sequence application/json json.dumps(message).encode()
pydantic.BaseModel application/json message.json().encode()
str text/plain message.encode()
bytes message

Также, некоторые брокеры поддерживают отправку специальных типов, которые описаны в соотвествующем разделе документации вашего брокера.

Инициализация брокера перед отправкой

Для того, чтобы отправить сообщение в очерендь, необходимо сначала подключиться к ней.

Если вы находитесь внутри запущенного Propan приложения, вам не нужно ничего делать: брокер уже запущен. Просто получите к нему доступ и отправьте сообщение.

1
2
3
4
5
6
7
8
from propan import PropanApp, RedisBroker

broker = RedisBroker("redis://localhost:6379")
app = PropanApp(broker)

@broker.handle("test")
async def handle(m: str):
    await broker.publish(m, "another-channel")
1
2
3
4
5
6
7
8
from propan import PropanApp, RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = PropanApp(broker)

@broker.handle("test")
async def handle(m: str):
    await broker.publish(m, "another-queue")
1
2
3
4
5
6
7
8
from propan import PropanApp, KafkaBroker

broker = KafkaBroker("localhost:9092")
app = PropanApp(broker)

@broker.handle("test")
async def handle(m: str):
    await broker.publish(m, "another-topic")
1
2
3
4
5
6
7
8
from propan import PropanApp, SQSBroker

broker = SQSBroker("http://localhost:9324", ...)
app = PropanApp(broker)

@broker.handle("test")
async def handle(m: str):
    await broker.publish(m, "another-queue")
1
2
3
4
5
6
7
8
from propan import PropanApp, NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = PropanApp(broker)

@broker.handle("test")
async def handle(m: str):
    await broker.publish(m, "another-subject")

Если же вы используете Propan только для отправки асинхронных сообщений в рамках другого фреймворка, вы можете использовать брокер в качестве контекстного менеджера для отправки.

async with RedisBroker("redis://localhost:6379") as broker:
    await broker.publish(m, "another-channel")
async with RabbitBroker("amqp://guest:guest@localhost:5672/") as broker:
    await broker.publish(m, "another-queue")
async with KafkaBroker("localhost:9092") as broker:
    await broker.publish(m, "another-topic")
async with SQSBroker("http://localhost:9324", ...) as broker:
    await broker.publish(m, "another-queue")
async with NatsBroker("nats://localhost:4222") as broker:
    await broker.publish(m, "another-subject")

Tip

В рамках этого контекста вы можете отправлять неограниченное число сообщений, а также синхронно ожидать ответ на них. Однако, инициализировать handle'ы в рамках этого контекста нельзя: они завершат свое выполнение вместе с контекстом.

Подробнее это будет разобрано в следующем разделе.