Mongo Change Streams, запущенные несколько раз (вид): приложение Node, работающее несколько экземпляров

Приложение My Node использует потоки изменений Mongo, и приложение запускает более 3 экземпляров в рабочем состоянии (в конечном итоге, так как это будет увеличиваться по мере роста). Таким образом, когда происходит изменение в функциональности потока изменений, выполняется столько раз, сколько происходит процессов.

Как настроить так, чтобы поток изменений запускался только один раз?

Вот что у меня есть:

const options = { fullDocument: "updateLookup" };

const filter = [
  {
    $match: {
      $and: [
        { "updateDescription.updatedFields.sites": { $exists: true } },
        { operationType: "update" }
      ]
    }
  }
];

const sitesStream = Client.watch(sitesFilter, options);

// Start listening to site stream
sitesStream.on("change", async change => {
  console.log("in site change stream", change);
  console.log(
    "in site change stream, update desc",
    change.updateDescription
  );

  // Do work...
  console.log("site change stream done.");
  return;
});

6 ответов

Решение

Хотя вариант Kafka звучит интересно, это была большая инфраструктурная работа на платформе, с которой я не знаком, поэтому я решил пойти с чем-то немного ближе к дому для меня, отправив сообщение MQTT в небольшое автономное приложение, и позволяя серверу MQTT отслеживать сообщения на предмет уникальности.

siteStream.on("change", async change => {
  console.log("in site change stream);
  const mqttClient = mqtt.connect("mqtt://localhost:1883");
  const id = JSON.stringify(change._id._data);
  // You'll want to push more than just the change stream id obviously...
  mqttClient.on("connect", function() {
    mqttClient.publish("myTopic", id);
    mqttClient.end();
  });
});

Я все еще работаю над окончательной версией сервера MQTT, но метод оценки уникальности сообщений, вероятно, будет хранить массив идентификаторов потока изменений в памяти приложения, так как нет необходимости сохранять их и оценивать, следует ли продолжать какие-либо действия. далее на основании того, было ли это изменение идентификатора потока замечено ранее.

var mqtt = require("mqtt");
var client = mqtt.connect("mqtt://localhost:1883");
var seen = [];
client.on("connect", function() {
  client.subscribe("myTopic");
});
client.on("message", function(topic, message) {
  context = message.toString().replace(/"/g, "");
  if (seen.indexOf(context) < 0) {
    seen.push(context);
    // Do stuff
  }
});

Это не относится к безопасности и т. Д., Но вы поняли идею.

Сделать это с надежными гарантиями сложно, но возможно. Подробнее об одном решении я писал здесь: https://www.alechenninger.com/2020/05/building-kafka-like-message-queue-with.html

Примеры написаны на Java, но важная часть - это алгоритм.

Все сводится к нескольким техникам:

  • Каждый процесс пытается получить блокировку
  • Каждому замку (или каждому изменению) соответствует жетон ограждения.
  • Обработка каждого изменения должна быть идемпотентной
  • Во время обработки изменения токен используется для обеспечения упорядоченных и однократных обновлений.

Подробнее в блоге.

Это легко сделать с помощью только операторов запросов Mongodb. Вы можете добавить запрос по модулю в поле идентификатора, где делителем является количество экземпляров вашего приложения (N). Остаток тогда является элементом {0, 1, 2, ..., N-1}. Если экземпляры вашего приложения пронумерованы в порядке возрастания от нуля до N-1, вы можете написать фильтр следующим образом:

const filter = [
  {
    "$match": {
      "$and": [
        // Other filters
        { "_id": { "$mod": [<number of instances>, <this instance's id>]}}
      ]
    }
  }
];

Похоже, вам нужен способ разделения обновлений между экземплярами. Вы смотрели в Apache Kafka? По сути, вы должны иметь одно приложение, которое записывает данные изменений в раздел Kafka Topic, и ваше приложение-узел будет потребителем Kafka. Это гарантирует, что только один экземпляр приложения получит обновление.

В зависимости от вашей стратегии секционирования, вы можете даже убедиться, что обновления для одной и той же записи всегда идут в одно и то же приложение узла (если вашему приложению необходимо поддерживать свое собственное состояние). В противном случае вы можете распространять обновления в режиме круговой проверки.

Самым большим преимуществом использования Kafka является то, что вы можете добавлять и удалять экземпляры без необходимости настройки конфигураций. Например, вы можете запустить один экземпляр, и он будет обрабатывать все обновления. Затем, как только вы запускаете другой экземпляр, каждый из них начинает обрабатывать половину нагрузки. Вы можете продолжить этот шаблон для всех экземпляров, сколько существует разделов (и вы можете настроить раздел так, чтобы иметь тысячи разделов, если хотите), что является мощью группы потребителей Kafka. Уменьшение работает в обратном порядке.

Будет ли это поле в БД под названием statusкоторый будет обновлен с помощью findAnUpdate на основе события, полученного из потока изменений. Допустим, вы получаете 2 события одновременно из потока изменений. Первое событие обновит статус наstart а другой выдаст ошибку, если статус start. Таким образом, второе событие не будет обрабатывать бизнес-логику.

Я не утверждаю, что это надежные решения промышленного уровня, но я считаю, что что-то подобное могло бы сработать.

Решение 1

применяя чтение-изменение-запись :

  1. Добавить поле в документ, все созданные документы имеют версию =0
  2. Получить событие ChangeStream
  3. Прочтите документ, который необходимо обновить
  4. Выполните обновление модели
  5. Версия приращения
  6. Обновите документ, в котором оба id а также version совпадение, в противном случае отменить изменение

Да, это создает 2 * n_application_replicas бесполезные запросы, поэтому есть другой вариант

Решение 2

  1. Создайте коллекцию ResumeTokens в mongo, которая будет хранить коллекцию → отображение токенов
  2. В коде обработчика changeStream после успешной записи обновите ResumeToken в коллекции
  3. Создайте переключатель функции, который отключит чтение ChangeStream в вашем приложении.
  4. Настройте только один экземпляр вашего приложения в качестве «читателя».

В случае сбоя «читателя» вы можете либо разрешить чтение на другом узле, либо повторно развернуть узел «читатель».

В результате: может быть бесконечное количество реплик, не считывающих читателя, и не будет никаких бесполезных запросов.

Другие вопросы по тегам