Как смотреть изменения потока в mongoDB и отправлять обновления на вызов ajax?
Я использую новую функцию MongoDB 3.6 watch () для отправки обновлений базы данных с сервера узла на ajax на стороне клиента.
Я создал веб-сервис, периодически запрашиваемый Ajax-вызовом. Между двумя последовательными вызовами ajax, я хочу, чтобы второй получал все обновления, которые произошли за это время. Я знаю, что должен возобновить поток изменений, как показано в официальной документации здесь: https://docs.mongodb.com/manual/changeStreams/. Однако я не выяснил, как применить это к моим конкретным потребностям, я имею в виду, в каком обратном вызове я могу обработать свои данные и отправить их в ответ веб-службы?
Вот часть моего серверного кода: server.js
const pipeline = [
{
$match : {
"operationType" : "insert"
,
"fullDocument.T" : { "$exists":true}
}
},
{
$project: { "fullDocument.ts": 1,
"fullDocument.T":1}
}
];
function getLiveData(handler){
console.log("in getLiveData");
var liveArray=[];
var resumeToken;
const changeStream = dbObject.collection('status').watch(pipeline);
changeStream.hasNext(function(err, change) {
if (err) return console.log(err);
expect(err).to.equal(null);
expect(change).to.exist;
console.log("in changeStream.hasNext");
changeStream.next(function(err, change) {
if (err) return console.log(err);
expect(err).to.equal(null);
console.log("in changeStream.next");
resumeToken = change._id;
expect(change._id).to.exist;
expect(changeStream.resumeToken).to.exist;
changeStream.close(function(err) {
if (err) return console.log(err);
expect(err).to.equal(null);
console.log("in changeStream.close");
const newChangeStream = dbObject.collection('status').watch({ resumeAfter: resumeToken });
newChangeStream.next(function(err, next) {
if (err) return console.log(err);
expect(err).to.equal(null);
expect(next).to.exist;
console.log("in newChangeStream.next");
//my own code
newChangeStream.on("change", function(change) {
console.log('in change stream, change : ',change);
liveArray.push([change.fullDocument.ts, change.fullDocument.T]);
var response = {
"liveArray" : liveArray
};
console.log("from getLiveData : " , response);
handler(response);
});
//my own code
// Since changeStream has an implicit seession,
// we need to close the changeStream for unit testing purposes
newChangeStream.close();
});
});
});
});
}
часть веб-сервиса:
app.get("/liveDataRequest", function(req, res){
getLiveData(function(data){
console.log("in handler", data);
res.status(200).send(data);
});
И вот, как мы видим, консольный журнал, часть, где я обрабатываю свои данные, никогда не вызывается:
in getLiveData
in changeStream.hasNext
in changeStream.next
in changeStream.close
in newChangeStream.next
in getLiveData
in changeStream.hasNext
in changeStream.next
in changeStream.close
in newChangeStream.next