Node Coroutines Параллельное управление потоком с генераторами и Promise

Я пытаюсь имитировать поток управления библиотеки async.js с сопрограммами и обещаниями, используя оба co а также bluebird.js но я сталкиваюсь с некоторыми проблемами. Мой код выглядит следующим образом, хотя это в основном псевдо-код, потому что реальный код был бы очень длинным, я могу добавить этот код позже, если потребуется...

 co(function*(){  
    var re = /^\w+([\.-]?\w+)*@\w+([\.-]?\w+)*(\.\w{2,3})+$/;
    var doc = yield People.findOne({email:  req.body.email}).exec();

    var filePath = path.join(__dirname, '../email-template.html');                         
    var html = yield fs.readFileAsync(filePath,'utf8');
    var emailsToSend = [];
    var emailStatuses = [];
    var validEmails = [];

    //make sure email is ok
    req.body.messagesToSend.forEach(function(message){
      if(message.email != null && re.test(message.email))
      { 
        validEmails.push(message);
      }else{
        // mark it as failed...
        emailStatuses.push({success : "FAILURE", email : message.email}); 
      }
    });

    yield Promise.all( validEmails, Promise.coroutine(function * (message){
      try{
        var person = yield People.findOne({email:  message.email }).exec();

        if(person){ 
          emailStatuses.push({status : "Already exists", email : message.email});
        }else{          
          emailsToSend.push({ email: message.email, message: message.text });
          }              
        }// else
      }catch(err){
        emailStatuses.push({status : "FAILURE", email : message.email}); 
      }//
    }));

    if( emailsToSend.length === 0){
      // no valid emails to process so just return              
      return res.status(200).json(emailStatuses);                     
    }// if no emails to send
    else{
      yield Promise.all(emailsToSend, Promise.coroutine(function * (emailMessage){
        try{                   
          var newInvite =  new Invite();  
          newInvite.email = emailMessage.email;
          newInvite.message = emailMessage.message;
          var invite = yield Invite.save();

          // now try to send the email
          var mailHTMl = html.replace( "{{EMAIL_PLACEHOLDER}}", req.body.registeredEmail);  

          var sendmail              = new emailProvider.Email();
          sendmail.setTos(emailMessage.email);
          sendmail.setFrom(common.DEF_EMAIL_SENDER);
          sendmail.setSubject(common.EMAIL_SUBJECT);
          sendmail.setHtml(mailHTMl);

          var successMail = yield emailProvider.send(sendmail);
          emailStatuses.push({status : "SUCCESS", email : emailMessage.email});
        }catch(err){
          //additional logging here which ive removed for purposes of brevity
          emailStatuses.push({status : "FAILURE", email : emailMessage.email});
        }           
      }));

      return res.status(200).json(emailStatuses);           
    }

  }).catch(function(err){
    //additional logging here which ive removed for purposes of brevity
    return res.status(500)
  });

У меня проблема с Promise.all, если я передаю массив, кажется, что он обрабатывает только первый элемент, даже если нет отклонения от обещания или любого типа ошибки.

Этот код работает, если я использую Promise.each, но затем он выполняется поочередно. Чего я хочу добиться, так это в основном иметь асинхронную серию с 2 async.foreach, которая будет выполняться один за другим и обрабатывать каждый элемент массива параллельно, но обрабатывать каждый массив последовательно, как показано ниже:

async.series([
  async.foreach
  async.foreach
]);

Тем не менее, я не уверен, что мне здесь не хватает для того, чтобы заставить его выполняться параллельно, потому что теперь, кажется, он работает нормально, если я использую Promise.each и получаю последовательное выполнение для каждого элемента массива.

1 ответ

Решение

Таким образом, в основном есть 2 способа сделать это, первое решение - использовать оригинальный код и просто использовать Promise.map, который я не уверен, если он выполняется параллельно, но в основном не остановится на первом элементе массива.

Второе - это довольно простое изменение отображения значений массива в функции сопрограмм, а затем выполнение Promise.all для них, как показано ниже:

Хотя, должен заметить, это заметно медленнее, чем использование async.js. Было бы полезно, если бы кто-нибудь мог объяснить, почему?

  co(function*(){  
    var re = /^\w+([\.-]?\w+)*@\w+([\.-]?\w+)*(\.\w{2,3})+$/;
    var doc = yield People.findOne({email:  req.body.email}).exec();

    var filePath = path.join(__dirname, '../email-template.html');                         
    var html = yield fs.readFileAsync(filePath,'utf8');
    var emailsToSend = [];
    var emailStatuses = [];
    var validEmails = [];

    //make sure email is ok
    req.body.messagesToSend.forEach(function(message){
      if(message.email != null && re.test(message.email))
      { 
        validEmails.push(message);
      }else{
        // mark it as failed...
        emailStatuses.push({success : "FAILURE", email : message.email}); 
      }
    });

    //yield Promise.all( validEmails, Promise.coroutine(function * (message){
    var firstPromises = validEmails.map(Promise.coroutine(function * (message){  
      try{
        var person = yield People.findOne({email:  message.email }).exec();

        if(person){ 
          emailStatuses.push({status : "Already exists", email : message.email});
        }else{          
          emailsToSend.push({ email: message.email, message: message.text });
          }              
        }// else
      }catch(err){
        emailStatuses.push({status : "FAILURE", email : message.email}); 
      }//
    }));

    yield Promise.all(firstPromises);

    if( emailsToSend.length === 0){
      // no valid emails to process so just return              
      return res.status(200).json(emailStatuses);                     
    }// if no emails to send
    else{
      //yield Promise.all(emailsToSend, Promise.coroutine(function * (emailMessage){
      var secondPromises = emailsToSend.map( Promise.coroutine(function * (emailMessage){
        try{                   
          var newInvite =  new Invite();  
          newInvite.email = emailMessage.email;
          newInvite.message = emailMessage.message;
          var invite = yield Invite.save();

          // now try to send the email
          var mailHTMl = html.replace( "{{EMAIL_PLACEHOLDER}}", req.body.registeredEmail);  

          var sendmail              = new emailProvider.Email();
          sendmail.setTos(emailMessage.email);
          sendmail.setFrom(common.DEF_EMAIL_SENDER);
          sendmail.setSubject(common.EMAIL_SUBJECT);
          sendmail.setHtml(mailHTMl);

          var successMail = yield emailProvider.send(sendmail);
          emailStatuses.push({status : "SUCCESS", email : emailMessage.email});
        }catch(err){
          //additional logging here which ive removed for purposes of brevity
          emailStatuses.push({status : "FAILURE", email : emailMessage.email});
        }           
      }));

      yield Promise.all(secondPromises);

      return res.status(200).json(emailStatuses);           
    }

  }).catch(function(err){
    //additional logging here which ive removed for purposes of brevity
    return res.status(500)
  });
Другие вопросы по тегам