MongoDB меняет время ожидания потока, если база данных не работает в течение некоторого времени
Я использую поток изменений mongoDB в nodejs, все работает нормально, но если база данных не работает, потребовалось более 10 5 секунд, чтобы подняться, ошибка истечения времени ожидания выброса потока изменений, вот мой код наблюдателя потока изменений
Service.prototype.watcher = function( db ){
let collection = db.collection('tokens');
let changeStream = collection.watch({ fullDocument: 'updateLookup' });
let resumeToken, newChangeStream;
changeStream.on('change', next => {
resumeToken = next._id;
console.log('data is ', JSON.stringify(next))
changeStream.close();
// console.log('resumeToken is ', JSON.stringify(resumeToken))
newChangeStream = collection.watch({ resumeAfter : resumeToken });
newChangeStream.on('change', next => {
console.log('insert called ', JSON.stringify( next ))
});
});
однако в конце базы данных я обработал это, то есть, если база данных отключена или переподключена с использованием этого кода
this.db.on('reconnected', function () {
console.info('MongoDB reconnected!');
});
this.db.on('disconnected', function() {
console.warn('MongoDB disconnected!');
});
но я не могу обработать наблюдатель потока изменений, чтобы остановить его, когда база данных не работает, и запустить его снова, когда база данных переподключена, или если есть какой-либо другой лучший способ сделать это?
2 ответа
Что вы хотите сделать, это заключить в капсулу watch()
вызвать функцию. Затем эта функция вызывает себя при ошибке, чтобы пересмотреть коллекцию, используя ранее сохраненный токен возобновления. В вашем коде отсутствует обработчик ошибок. Например:
const MongoClient = require('mongodb').MongoClient
const uri = 'mongodb://localhost:27017/test?replicaSet=replset'
var resume_token = null
run()
function watch_collection(con, db, coll) {
console.log(new Date() + ' watching: ' + coll)
con.db(db).collection(coll).watch({resumeAfter: resume_token})
.on('change', data => {
console.log(data)
resume_token = data._id
})
.on('error', err => {
console.log(new Date() + ' error: ' + err)
watch_collection(con, coll)
})
}
async function run() {
con = await MongoClient.connect(uri, {"useNewUrlParser": true})
watch_collection(con, 'test', 'test')
}
Обратите внимание, что watch_collection()
содержит watch()
метод вместе с его обработчиком. При изменении он распечатает изменения и сохранит токен возобновления. В случае ошибки он снова вызывает себя для повторного просмотра коллекции.
Это решение, которое я разработал, просто добавьте функцию stream.on(error), чтобы она не зависала при возникновении ошибки, перезапустите поток при переподключении базы данных, а также сохраните токен возобновления в файле для каждого события, это полезно, когда приложение происходит сбой или остановка, и вы запускаете снова, и в течение этого времени, если было добавлено x количество записей, поэтому при перезапуске приложения просто получите последний токен возобновления из файла и запустите оттуда наблюдатель, после чего все вставленные записи будут вставлены, и, следовательно, запись не будет пропустил, вот код ниже
var rsToken ;
try {
rsToken = await this.getResumetoken()
} catch (error) {
rsToken = null ;
}
if (!rsToken)
changeStream = collection.watch({ fullDocument: 'updateLookup' });
else
changeStream = collection.watch({ fullDocument: 'updateLookup', resumeAfter : rsToken });
changeStream.on('change', next => {
resumeToken = next._id;
THIS.saveTokenInfile(resumeToken)
cs_processor.process( next )
});
changeStream.on('error', err => {
console.log('changestream error ')
})