Как ждать потока, чтобы закончить трубопровод? (Nodejs)

У меня есть массив обещаний цикла for, поэтому я использовал Promise.all, чтобы просмотреть их, а затем вызвал их.

let promises = [];
promises.push(promise1);
promises.push(promise2);
promises.push(promise3);

Promise.all(promises).then((responses) => {
  for (let i = 0; i < promises.length; i++) {
    if (promise.property === something) {
      //do something
    } else {
      let file = fs.createWriteStream('./hello.pdf');
      let stream = responses[i].pipe(file);
      /*
         I WANT THE PIPING AND THE FOLLOWING CODE 
         TO RUN BEFORE NEXT ITERATION OF FOR LOOP
      */
      stream.on('finish', () => {
        //extract the text out of the pdf
        extract(filePath, {splitPages: false}, (err, text) => {
        if (err) {
          console.log(err);
        } else {
          arrayOfDocuments[i].text_contents = text;
        }
      });
    });    
  }
}

Обещание1, обещание2 и обещание3 - это некоторые запросы http, и если один из них является приложением /pdf, я записываю его в поток и анализирую текст. Но этот код запускает следующую итерацию перед анализом теста из PDF. Есть ли способ заставить код ждать, пока конвейер и поток не будут завершены, прежде чем перейти к следующей итерации?

5 ответов

Без асинхронности / ожидания это довольно неприятно. С помощью async/await просто сделайте это:

Promise.all(promises).then(async (responses) => {
  for (...) {
    await new Promise(fulfill => stream.on("finish", fulfill));
    //extract the text out of the PDF
  }
})

Что-то вроде следующего также будет работать. Я использую этот шаблон довольно часто:

let promises = [];
promises.push(promise1);
promises.push(promise2);
promises.push(promise3);

function doNext(){
  if(!promises.length) return;
  promises.shift().then((resolved) =>{
    if(resolved.property === something){
      ...
      doNext();
    }else{
      let file = fs.createWriteStream('./hello.pdf');
      let stream = resolved.pipe(file);
      stream.on('finish', () =>{
        ...
        doNext();
      });
    }

  })
}
doNext();

или разбить обработчик на контроллер и обработчик Promisified:

function streamOrNot(obj){
  return new Promise(resolve, reject){
    if(obj.property === something){
      resolve();
      return;
    }
    let file = fs.createWriteStream...;
    stream.on('finish', () =>{
      ...
      resolve();
    });
  }
}

function doNext(){
  if(!promises.length) return;
  return promises.shift().then(streamOrNot).then(doNext);
}

doNext()

Использовать await с stream.pipeline() вместо stream.pipe():

      import * as StreamPromises from "stream/promises";

...
await StreamPromises.pipeline(sourceStream, destinationStream);

Вы можете написать часть else внутри собственной функции. Так что обработка потока будет происходить параллельно

(function(i) {
    let file = fs.createWriteStream('./hello.pdf');
    let stream = responses[i].pipe(file);
  /*
     I WANT THE PIPING AND THE FOLLOWING CODE 
     TO RUN BEFORE NEXT ITERATION OF FOR LOOP
  */
    stream.on('finish', () => {
      //extract the text out of the pdf
      extract(filePath, {splitPages: false}, (err, text) => {
      if (err) {
        console.log(err);
      } 
      else {
        arrayOfDocuments[i].text_contents = text;
      }
    });
  });    
})(i) 

В противном случае вы можете обрабатывать потоковую часть как часть самого исходного / индивидуального обещания.

На данный момент вы создаете обещание и добавляете его в массив, вместо того, чтобы добавлять в массив promise.then (который также является обещанием). А внутри обработчика вы делаете потоковые вещи.

Я считаю, что вы могли бы просто сделать это для Node.js 15+ после того, как прикрепите обратный вызов кfinishсобытие...

      const { finished } = require('node:stream/promises');

await finished(stream);

https://nodejs.org/api/stream.html#streamfinishedstream-options

Другие вопросы по тегам