MongoDB меняет время ожидания потока, если база данных не работает в течение некоторого времени

Я использую поток изменений mongoDB в nodejs, все работает нормально, но если база данных не работает, потребовалось более 10 5 секунд, чтобы подняться, ошибка истечения времени ожидания выброса потока изменений, вот мой код наблюдателя потока изменений

Service.prototype.watcher = function( db ){

let collection = db.collection('tokens');
let changeStream = collection.watch({ fullDocument: 'updateLookup' });
let resumeToken, newChangeStream;

changeStream.on('change', next => {
    resumeToken = next._id;
    console.log('data is ', JSON.stringify(next))
    changeStream.close();
    // console.log('resumeToken is ', JSON.stringify(resumeToken))
    newChangeStream = collection.watch({ resumeAfter : resumeToken });
    newChangeStream.on('change', next => {
        console.log('insert called ', JSON.stringify( next ))
    });
});

однако в конце базы данных я обработал это, то есть, если база данных отключена или переподключена с использованием этого кода

 this.db.on('reconnected', function () {
    console.info('MongoDB reconnected!');
});
this.db.on('disconnected', function() {
    console.warn('MongoDB disconnected!');
});

но я не могу обработать наблюдатель потока изменений, чтобы остановить его, когда база данных не работает, и запустить его снова, когда база данных переподключена, или если есть какой-либо другой лучший способ сделать это?

2 ответа

Что вы хотите сделать, это заключить в капсулу watch() вызвать функцию. Затем эта функция вызывает себя при ошибке, чтобы пересмотреть коллекцию, используя ранее сохраненный токен возобновления. В вашем коде отсутствует обработчик ошибок. Например:

const MongoClient = require('mongodb').MongoClient
const uri = 'mongodb://localhost:27017/test?replicaSet=replset'
var resume_token = null

run()

function watch_collection(con, db, coll) {
  console.log(new Date() + ' watching: ' + coll)
  con.db(db).collection(coll).watch({resumeAfter: resume_token})
    .on('change', data => {
      console.log(data)
      resume_token = data._id
    })
    .on('error', err => {
      console.log(new Date() + ' error: ' + err)
      watch_collection(con, coll)
    })
}

async function run() {
  con = await MongoClient.connect(uri, {"useNewUrlParser": true})
  watch_collection(con, 'test', 'test')
}

Обратите внимание, что watch_collection() содержит watch() метод вместе с его обработчиком. При изменении он распечатает изменения и сохранит токен возобновления. В случае ошибки он снова вызывает себя для повторного просмотра коллекции.

Это решение, которое я разработал, просто добавьте функцию stream.on(error), чтобы она не зависала при возникновении ошибки, перезапустите поток при переподключении базы данных, а также сохраните токен возобновления в файле для каждого события, это полезно, когда приложение происходит сбой или остановка, и вы запускаете снова, и в течение этого времени, если было добавлено x количество записей, поэтому при перезапуске приложения просто получите последний токен возобновления из файла и запустите оттуда наблюдатель, после чего все вставленные записи будут вставлены, и, следовательно, запись не будет пропустил, вот код ниже

var rsToken ;
    try {
        rsToken = await this.getResumetoken()
    } catch (error) {
        rsToken = null ;
    }

    if (!rsToken)
        changeStream = collection.watch({ fullDocument: 'updateLookup' });
    else 
        changeStream = collection.watch({ fullDocument: 'updateLookup', resumeAfter : rsToken  });

    changeStream.on('change', next => {

        resumeToken = next._id;
        THIS.saveTokenInfile(resumeToken)

        cs_processor.process( next )


    });  
    changeStream.on('error', err => {
        console.log('changestream error ')
    })
Другие вопросы по тегам