Перейти к содержанию

NATS JetStream

Обычное использование NATS идеально подходит для сценариев, в которых:

  • publisher и consumer всегда находятся онлайн;
  • система допускает потерю сообщений.

Если вам нужны более строгие ограничения, а именно:

  • наличие механизма подтверждения обработки сообщений (ack/nack);
  • персистентность сообщений (при отсутствии consumer'а сообщения будут накапливаться в очереди).

Вам следует использование расширение NATS JetStream.

На самом деле расширение JetStream - это тот же самый NATS с добавлением персистентного слоя над файловой системой, который обеспечивает хранение сообщений в очереди. Поэтому все интерфейсы публикации и потребления сообщений аналогичны обычному использованию NATS.

Однако, сама логика работы слоя JetStream имеет множество возможностей для конфигурации: от политики удаления старых сообщений до ограничения на максимальное число хранимых сообщений. Подробно со всеми возможностями JetStream вы можете ознакомиться в официальной документации.

Если вы работали с другими брокерами сообщений, то вам следует знать, что логика работы JS ближе к Kafka, нежели к RabbitMQ: сообщения после подтверждения их обработки не удаляются из очереди, а остаются там до тех пор, пока очередь не наполнится и не начнет удалять старые сообщения (либо в соответсвии с другой логикой, которую вы можете сконфигурировать сами).

При подключении consumer'а (и, особенно, при переподключении) вы должны сами определить, в соотвествии с какой логикой он будет потреблять сообщения: с самого начала, начиная с какого-то сообщения, начиная с какого-то времени, только новые и т.д. Не удивляйтесь, если при восстановлении соединения ваш consumer начнет заново обрабатывать все сообщения, полученные ранее - вы просто не определили это правило.

Также NATS JetStream имеет встроенное key-value(cхоже с Redis) и object(схоже с Minio) хранилища, которые, помимо своего базового интерфейса положить/прочитать имеют возможность подписки на события, что может быть крайне полезно во многих сценариях.

Propan не предоставляет доступ к этому функционалу напрямую, однако он покрывается используемой библиотекой nats-py. Доступ к объекту JS вы можете получить из контекста приложения:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from propan import PropanApp, NatsJSBroker
from propan.annotations import NatsJS

broker = NatsJSBroker()
app = PropanApp(broker)

@app.after_startup
async def example(js: NatsJS):
    # JS Key-Value Storage
    storage = await js.create_key_value(bucket="propan_kv")

    await storage.put("hello", b"propan!")
    assert (await storage.get("hello")) == b"propan!"

    # JS Object Storage
    storage = await js.create_object_store("propan-obs")

    obj_name = "file.mp4"
    with open(obj_name) as f:
        await storage.put(obj_name, f)

    with open(f"copy-{obj_name}") as f:
        await storage.get(obj_name, f)