Как буферизовать вставки MongoDB при отключении в node.js?

Мы читаем файл XML (используя xml-stream) с около 500k элементов и вставьте их в MongoDB следующим образом:

xml.on(`endElement: product`, writeDataToDb.bind(this, "product"));

Вставить в writeDataToDb(type, obj) выглядит так:

collection.insertOne(obj, {w: 1, wtimeout: 15000}).catch((e) => { });

Теперь, когда соединение Mongo разрывается, поток xml все еще читает, а консоль заполняется сообщениями об ошибках (невозможно вставить, отключено, EPIPE поврежден, ...).

В документах сказано:

Когда вы закрываете процесс mongod, драйвер останавливает операции обработки и продолжает их буферизовать, поскольку bufferMaxEntries по умолчанию равен -1, что означает буферизацию всех операций.

Что на самом деле делает этот буфер?

Мы замечаем, что когда мы вставляем данные и закрываем сервер Монго, все буферизируется, затем мы возвращаем сервер Монго в резервное состояние, собственный драйвер успешно переподключается, и узел возобновляет вставку данных, но буферные документы (во время автономной работы Монго) не вставляются. снова.

Поэтому я ставлю под сомнение этот буфер и его использование.

Цель:

Мы ищем лучший способ сохранить вставки в буфере, пока монго не вернется (в 15000 миллисекунд согласно wtimeout) и затем вставьте буферизованные документы или воспользуйтесь xml.pause(); а также xml.resume() что мы пытались без успеха.

В основном нам нужна небольшая помощь в том, как обрабатывать разъединения без потери данных или прерываний.

2 ответа

Вставка 500K элементов с помощью insertOne() - очень плохая идея. Вместо этого вы должны использовать массовые операции, которые позволяют вам вставить много документов в одном запросе. (здесь, например, 10000, так что это можно сделать за 50 отдельных запросов). Чтобы избежать проблемы с буферизацией, вы можете обработать ее вручную:

  1. Отключить буферизацию с bufferMaxEntries: 0
  2. Установите свойства переподключения: reconnectTries: 30, reconnectInterval: 1000
  3. Создайте массовую операцию и накормите ее 10000 элементами
  4. Приостановка чтения XML. Попробуйте вставить 10000 предметов. Если это не удается, повторите каждые 3000 мсек, пока это не будет успешно
  5. Вы можете столкнуться с некоторыми проблемами с дубликатами ID, если массовая операция была прервана во время выполнения, поэтому игнорируйте их (код ошибки: 11000)

Вот пример сценария:

var fs = require('fs')
var Xml = require('xml-stream')

var MongoClient = require('mongodb').MongoClient
var url = 'mongodb://localhost:27017/test'

MongoClient.connect(url, {
  reconnectTries: 30,
  reconnectInterval: 1000,
  bufferMaxEntries: 0
}, function (err, db) {
  if (err != null) {
    console.log('connect error: ' + err)
  } else {
    var collection = db.collection('product')
    var bulk = collection.initializeUnorderedBulkOp()
    var totalSize = 500001
    var size = 0

    var fileStream = fs.createReadStream('data.xml')
    var xml = new Xml(fileStream)
    xml.on('endElement: product', function (product) {
      bulk.insert(product)
      size++
      // if we have enough product, save them using bulk insert
      if (size % 10000 == 0) {
        xml.pause()
        bulk.execute(function (err, result) {
          if (err == null) {
            bulk = collection.initializeUnorderedBulkOp()
            console.log('doc ' + (size - 10000) + ' : ' + size + ' saved on first try')
            xml.resume()
          } else {
            console.log('bulk insert failed: ' + err)
            counter = 0
            var retryInsert = setInterval(function () {
              counter++
              bulk.execute(function (err, result) {
                if (err == null) {
                  clearInterval(retryInsert)
                  bulk = collection.initializeUnorderedBulkOp()
                  console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries')
                  xml.resume()
                } else if (err.code === 11000) { // ignore duplicate ID error
                  clearInterval(retryInsert)
                  bulk = collection.initializeUnorderedBulkOp()
                  console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries')
                  xml.resume()
                } else {
                  console.log('failed after first try: ' + counter, 'error: ' + err)
                }
              })
            }, 3000) // retry every 3000ms until success
          }
        })
      } else if (size === totalSize) {
        bulk.execute(function (err, result) {
          if (err == null) {
            db.close()
          } else {
            console.log('bulk insert failed: ' + err)
          }
        })
      }
    })
  }
})

пример журнала вывода:

doc 0 : 10000 saved on first try
doc 10000 : 20000 saved on first try
doc 20000 : 30000 saved on first try
[...]
bulk insert failed: MongoError: interrupted at shutdown // mongodb server shutdown
failed after first try: 1 error: MongoError: no connection available for operation and number of stored operation > 0
failed after first try: 2 error: MongoError: no connection available for operation and number of stored operation > 0
failed after first try: 3 error: MongoError: no connection available for operation and number of stored operation > 0
doc 130000 : 140000 saved after 4 tries
doc 140000 : 150000 saved on first try
[...]

Я не знаю конкретно о драйвере Mongodb и этом буфере записей. Может быть, он хранит данные только в определенных сценариях.

Поэтому я отвечу на этот вопрос более общим подходом, который может работать с любой базой данных.

Подводя итог, у вас есть две проблемы:

  1. Вы не восстанавливаетесь после неудачных попыток
  2. Поток XML отправляет данные слишком быстро

Чтобы справиться с первой проблемой, вам необходимо реализовать алгоритм повторных попыток, который обеспечит много попыток, прежде чем сдаться.

Чтобы справиться со вторым вопросом, вам нужно реализовать обратное давление на поток XML. Вы можете сделать это, используя pause метод, resume метод и входной буфер.

var Promise = require('bluebird');
var fs = require('fs');
var Xml = require('xml-stream');

var fileStream = fs.createReadStream('myFile.xml');
var xml = new Xml(fileStream);

// simple exponential retry algorithm based on promises
function exponentialRetry(task, initialDelay, maxDelay, maxRetry) {
    var delay = initialDelay;
    var retry = 0;
    var closure = function() {
        return task().catch(function(error) {
            retry++;
            if (retry > maxRetry) {
                throw error
            }
            var promise = Promise.delay(delay).then(closure);
            delay = Math.min(delay * 2, maxDelay);
            return promise;
        })
    };
    return closure();
}

var maxPressure = 100;
var currentPressure = 0;
var suspended = false;
var stopped = false;
var buffer = [];

// handle back pressure by storing incoming tasks in the buffer
// pause the xml stream as soon as we have enough tasks to work on
// resume it when the buffer is empty
function writeXmlDataWithBackPressure(product) {
    // closure used to try to start a task
    var tryStartTask = function() {
        // if we have enough tasks running, pause the xml stream
        if (!stopped && !suspended && currentPressure >= maxPressure) {
            xml.pause();
            suspended = true;
            console.log("stream paused");
        }
        // if we have room to run tasks
        if (currentPressure < maxPressure) {
            // if we have a buffered task, start it
            // if not, resume the xml stream
            if (buffer.length > 0) {
                buffer.shift()();
            } else if (!stopped) {
                try {
                    xml.resume();
                    suspended = false;
                    console.log("stream resumed");
                } catch (e) {
                    // the only way to know if you've reached the end of the stream
                    // xml.on('end') can be triggered BEFORE all handlers are called
                    // probably a bug of xml-stream
                    stopped = true;
                    console.log("stream end");
                }
            }
        }
    };

    // push the task to the buffer
    buffer.push(function() {
        currentPressure++;
        // use exponential retry to ensure we will try this operation 100 times before giving up
        exponentialRetry(function() {
            return writeDataToDb(product)
        }, 100, 2000, 100).finally(function() {
            currentPressure--;
            // a task has just finished, let's try to run a new one
            tryStartTask();
        });
    });

    // we've just buffered a task, let's try to run it
    tryStartTask();
}

// write the product to database here :)
function writeDataToDb(product) {
    // the following code is here to create random delays and random failures (just for testing)
    var timeToWrite = Math.random() * 100;
    var failure = Math.random() > 0.5;
    return Promise.delay(timeToWrite).then(function() {
        if (failure) {
            throw new Error();
        }
        return null;
    })
}

xml.on('endElement: product', writeXmlDataWithBackPressure);

Поиграй с этим, поставь немного console.log чтобы понять, как он себя ведет. Я надеюсь, что это поможет вам решить вашу проблему:)

Другие вопросы по тегам