Как смотреть изменения потока в mongoDB и отправлять обновления на вызов ajax?

Я использую новую функцию MongoDB 3.6 watch () для отправки обновлений базы данных с сервера узла на ajax на стороне клиента.

Я создал веб-сервис, периодически запрашиваемый Ajax-вызовом. Между двумя последовательными вызовами ajax, я хочу, чтобы второй получал все обновления, которые произошли за это время. Я знаю, что должен возобновить поток изменений, как показано в официальной документации здесь: https://docs.mongodb.com/manual/changeStreams/. Однако я не выяснил, как применить это к моим конкретным потребностям, я имею в виду, в каком обратном вызове я могу обработать свои данные и отправить их в ответ веб-службы?

Вот часть моего серверного кода: server.js

const pipeline = [
  {
    $match : {
      "operationType" : "insert"
      ,
      "fullDocument.T" : { "$exists":true}
    }
  },
  {
    $project: { "fullDocument.ts": 1,
                "fullDocument.T":1}
  }
];

function getLiveData(handler){

console.log("in getLiveData");
var liveArray=[];

var resumeToken;

const changeStream = dbObject.collection('status').watch(pipeline);
changeStream.hasNext(function(err, change) {
  if (err) return console.log(err);
  expect(err).to.equal(null);
  expect(change).to.exist;
  console.log("in changeStream.hasNext");

  changeStream.next(function(err, change) {
    if (err) return console.log(err);
    expect(err).to.equal(null);
    console.log("in changeStream.next");
    resumeToken = change._id;

    expect(change._id).to.exist;
    expect(changeStream.resumeToken).to.exist;

    changeStream.close(function(err) {
      if (err) return console.log(err);
      expect(err).to.equal(null);
      console.log("in changeStream.close");
      const newChangeStream = dbObject.collection('status').watch({ resumeAfter: resumeToken });

      newChangeStream.next(function(err, next) {
        if (err) return console.log(err);
        expect(err).to.equal(null);
        expect(next).to.exist;
        console.log("in newChangeStream.next");


        //my own code
        newChangeStream.on("change", function(change) {
        console.log('in change stream, change : ',change);
        liveArray.push([change.fullDocument.ts, change.fullDocument.T]);

        var response = {
            "liveArray" : liveArray
        };

        console.log("from getLiveData : " , response);

        handler(response);
      });   
        //my own code

        // Since changeStream has an implicit seession,
        // we need to close the changeStream for unit testing purposes
        newChangeStream.close();
      });
    });
  });
});

}

часть веб-сервиса:

 app.get("/liveDataRequest", function(req, res){        
      getLiveData(function(data){
        console.log("in handler", data);
        res.status(200).send(data);           
      });

И вот, как мы видим, консольный журнал, часть, где я обрабатываю свои данные, никогда не вызывается:

in getLiveData
in changeStream.hasNext
in changeStream.next
in changeStream.close
in newChangeStream.next
in getLiveData
in changeStream.hasNext
in changeStream.next
in changeStream.close
in newChangeStream.next

0 ответов

Другие вопросы по тегам