Будет ли огромное количество обратных вызовов сломать сценарий или продолжится, когда будет достаточно оперативной памяти?

У меня есть функция, которая выбирает идентификаторы потоков (разговоров Gmail) из базы данных, а затем запрашивает у API Google все данные для каждого идентификатора потока. Получив объект потока, он сохраняет его в базе данных. Это прекрасно работает для моей входящей почты, которая имеет ~1k сообщений. Но я не уверен, что это будет работать для учетных записей с более чем 100k сообщений.

Теперь то, что я спрашиваю, когда машина исчерпает память, она сломается, или она продолжит выполнять функции обратного вызова, когда снова будет достаточно оперативной памяти? Должен ли я изменить этот код, чтобы сделать это по частям (перезапустить весь сценарий в некоторых точках и продолжить с новой оперативной памяти от того, где он последний раз закончился?)

function eachThread(auth) {
  var gmail = google.gmail('v1');

  MongoClient.connect(mongoUrl, function(err, db){
    assert.equal(null, err);
    var collection = db.collection('threads');
    // Find all data in collection and convert it to array
    collection.find().toArray(function(err, docs){
      assert.equal(null, err);
      var threadContents = [];
      // For each doc in array...
      for (var i = 0; i < docs.length; i++) {
        gmail
        .users
        .threads
        .get( {auth:auth,'userId':'me', 'id':docs[i].id}, function(err, resp){
          assert.equal(null, err);
          threadContents.push(resp);
          console.log(threadContents.length);
          console.log(threadContents[threadContents.length - 1].id);
          var anotherCollection = db.collection('threadContents');
          anotherCollection.updateOne(
            {id: threadContents[threadContents.length - 1].id},
            threadContents[threadContents.length - 1],
            {upsert:true},
            function(err, result){
              assert.equal(null, err);
              console.log('updated one.');
          });
          if (threadContents.length === docs.length) {
            console.log('Length matches!');
            db.close();
          }
        });//end(callback(threads.get))
      }//end(for(docs.length))
    });//end(find.toArray)
  });//end(callback(mongo.connect))
}//end(func(eachThread))

3 ответа

Решение

Вам не хватит памяти, если вы не получите все и поместите его в массив. Также я бы не стал создавать экземпляры объектов, которые одинаковы для каждого элемента в цикле.

Вот пример кода, который не будет исчерпан из памяти, однако это запуск и забывание, что означает, что вы не получите обратный вызов после его завершения и т. Д. Если вы хотите сделать это, вам нужно будет использовать promises / async.

// Fire-and-forget type of function
// Will not run out of memory, GC will take care of that
function eachThread(auth, cb) {
  var gmail = google.gmail('v1');

  MongoClient.connect(mongoUrl, (err, db) => {
    if (err) {
      return cb(err);
    }

    var threadsCollection = db.collection('threads').find();
    var contentsCollection = db.collection('threadContents');

    threadsCollection.on('data', (doc) => {
      gmail.users.threads.get({ auth: auth, 'userId': 'me', 'id': doc.id }, (err, res) => {
        if (err) {
          return cb(err);
        }

        contentsCollection.updateOne({ id: doc.id }, res, { upsert: true }, (err, result) => {
          if (err) {
            return cb(err);
          }
        });
      });
    });

    threadsCollection.on('end', () => { db.close() });
  });
}

Замена вашего for цикл по async.mapLimit достаточно, чтобы добавить деталь по функциональности. Я также позволил себе anotherCollection создание рядом collectionПоскольку открытие соединения один раз лучше, чем его открытие, если не тысячи раз.

Я также заменил ваш assert.equal от callback(err), asyncФункция поймет, что она должна остановить все, и позволит вам чисто выйти, а не выдавать исключение.

РЕДАКТИРОВАТЬ:

Как заметил @chernando, используя collection.find().toArray импортирует всю коллекцию в оперативную память. Лучшим способом выполнения части за частью будет потоковая передача данных или обращение к БД с просьбой передать данные по частям.

Эта версия предполагает, что у вас достаточно оперативной памяти, чтобы получить collection.find().toArray работает без проблем.

Я, вероятно, вернусь позже с адаптацией инструмента, о котором я говорил в комментариях, когда у меня будет время.

var async = require('async');

function eachThread(auth) {
  var gmail = google.gmail('v1'),
      limit = 100; //Size of the parts

  MongoClient.connect(mongoUrl, function(err, db){
    assert.equal(null, err);
    var collection = db.collection('threads'),
        anotherCollection = db.collection('threadContents');
    // Find all data in collection and convert it to array
    collection.find().toArray(function(err, docs){
      assert.equal(null, err);
      var threadContents = [];
//Change here
      async.mapLimit(docs, limit, (doc, callback) => {
        gmail
        .users
        .threads
        .get( {auth:auth,'userId':'me', 'id':docs[i].id}, function(err, resp){
          if(err) {
            return callback(err);
          }
          threadContents.push(resp);
          console.log(threadContents.length);
          console.log(threadContents[threadContents.length - 1].id);
          anotherCollection.updateOne(
            {id: threadContents[threadContents.length - 1].id},
            threadContents[threadContents.length - 1],
            {upsert:true},
            function(err, result){
              if(err) {
                console.error(err);
              } else {
                console.log('updated one.');
              }
              callback(err);
          });
        });//end(callback(threads.get))
//Change here
      }, (error) => {
        if(error) {
          console.error('Transfert stopped because of error:' + err);
        } else {
          console.log('Transfert successful');
        }
      });//end(async.mapLimit)
    });//end(find.toArray)
  });//end(callback(mongo.connect))
}//end(func(eachThread))

Теперь то, что я спрашиваю, когда машина исчерпает память, она сломается, или она продолжит выполнять функции обратного вызова, когда снова будет достаточно оперативной памяти?

Если у вас не хватит памяти, ОС убьет ваш процесс. В Linux вы увидите OOM (Out of Memory). Так что да, это сломается.

В этих сценариях вы можете рассмотреть возможность использования потоков или генераторов, чтобы сохранить в памяти только тот кусок данных, который вам нужно обработать.

В твоем случае MongoDB обеспечивает потоки на find метод https://mongodb.github.io/node-mongodb-native/2.0/tutorials/streams/

Примерно так должно работать:

var collection = db.collection('threads');
var cursor = collection.find()

cursor.on('data', function(doc) {
  gmail
  .users
  .threads
  .get( {auth:auth,'userId':'me', 'id': doc.id}, function(err, resp) {
    ...
  })
})
Другие вопросы по тегам