Node Coroutines Параллельное управление потоком с генераторами и Promise
Я пытаюсь имитировать поток управления библиотеки async.js с сопрограммами и обещаниями, используя оба co
а также bluebird.js
но я сталкиваюсь с некоторыми проблемами. Мой код выглядит следующим образом, хотя это в основном псевдо-код, потому что реальный код был бы очень длинным, я могу добавить этот код позже, если потребуется...
co(function*(){
var re = /^\w+([\.-]?\w+)*@\w+([\.-]?\w+)*(\.\w{2,3})+$/;
var doc = yield People.findOne({email: req.body.email}).exec();
var filePath = path.join(__dirname, '../email-template.html');
var html = yield fs.readFileAsync(filePath,'utf8');
var emailsToSend = [];
var emailStatuses = [];
var validEmails = [];
//make sure email is ok
req.body.messagesToSend.forEach(function(message){
if(message.email != null && re.test(message.email))
{
validEmails.push(message);
}else{
// mark it as failed...
emailStatuses.push({success : "FAILURE", email : message.email});
}
});
yield Promise.all( validEmails, Promise.coroutine(function * (message){
try{
var person = yield People.findOne({email: message.email }).exec();
if(person){
emailStatuses.push({status : "Already exists", email : message.email});
}else{
emailsToSend.push({ email: message.email, message: message.text });
}
}// else
}catch(err){
emailStatuses.push({status : "FAILURE", email : message.email});
}//
}));
if( emailsToSend.length === 0){
// no valid emails to process so just return
return res.status(200).json(emailStatuses);
}// if no emails to send
else{
yield Promise.all(emailsToSend, Promise.coroutine(function * (emailMessage){
try{
var newInvite = new Invite();
newInvite.email = emailMessage.email;
newInvite.message = emailMessage.message;
var invite = yield Invite.save();
// now try to send the email
var mailHTMl = html.replace( "{{EMAIL_PLACEHOLDER}}", req.body.registeredEmail);
var sendmail = new emailProvider.Email();
sendmail.setTos(emailMessage.email);
sendmail.setFrom(common.DEF_EMAIL_SENDER);
sendmail.setSubject(common.EMAIL_SUBJECT);
sendmail.setHtml(mailHTMl);
var successMail = yield emailProvider.send(sendmail);
emailStatuses.push({status : "SUCCESS", email : emailMessage.email});
}catch(err){
//additional logging here which ive removed for purposes of brevity
emailStatuses.push({status : "FAILURE", email : emailMessage.email});
}
}));
return res.status(200).json(emailStatuses);
}
}).catch(function(err){
//additional logging here which ive removed for purposes of brevity
return res.status(500)
});
У меня проблема с Promise.all, если я передаю массив, кажется, что он обрабатывает только первый элемент, даже если нет отклонения от обещания или любого типа ошибки.
Этот код работает, если я использую Promise.each, но затем он выполняется поочередно. Чего я хочу добиться, так это в основном иметь асинхронную серию с 2 async.foreach, которая будет выполняться один за другим и обрабатывать каждый элемент массива параллельно, но обрабатывать каждый массив последовательно, как показано ниже:
async.series([
async.foreach
async.foreach
]);
Тем не менее, я не уверен, что мне здесь не хватает для того, чтобы заставить его выполняться параллельно, потому что теперь, кажется, он работает нормально, если я использую Promise.each и получаю последовательное выполнение для каждого элемента массива.
1 ответ
Таким образом, в основном есть 2 способа сделать это, первое решение - использовать оригинальный код и просто использовать Promise.map, который я не уверен, если он выполняется параллельно, но в основном не остановится на первом элементе массива.
Второе - это довольно простое изменение отображения значений массива в функции сопрограмм, а затем выполнение Promise.all для них, как показано ниже:
Хотя, должен заметить, это заметно медленнее, чем использование async.js. Было бы полезно, если бы кто-нибудь мог объяснить, почему?
co(function*(){
var re = /^\w+([\.-]?\w+)*@\w+([\.-]?\w+)*(\.\w{2,3})+$/;
var doc = yield People.findOne({email: req.body.email}).exec();
var filePath = path.join(__dirname, '../email-template.html');
var html = yield fs.readFileAsync(filePath,'utf8');
var emailsToSend = [];
var emailStatuses = [];
var validEmails = [];
//make sure email is ok
req.body.messagesToSend.forEach(function(message){
if(message.email != null && re.test(message.email))
{
validEmails.push(message);
}else{
// mark it as failed...
emailStatuses.push({success : "FAILURE", email : message.email});
}
});
//yield Promise.all( validEmails, Promise.coroutine(function * (message){
var firstPromises = validEmails.map(Promise.coroutine(function * (message){
try{
var person = yield People.findOne({email: message.email }).exec();
if(person){
emailStatuses.push({status : "Already exists", email : message.email});
}else{
emailsToSend.push({ email: message.email, message: message.text });
}
}// else
}catch(err){
emailStatuses.push({status : "FAILURE", email : message.email});
}//
}));
yield Promise.all(firstPromises);
if( emailsToSend.length === 0){
// no valid emails to process so just return
return res.status(200).json(emailStatuses);
}// if no emails to send
else{
//yield Promise.all(emailsToSend, Promise.coroutine(function * (emailMessage){
var secondPromises = emailsToSend.map( Promise.coroutine(function * (emailMessage){
try{
var newInvite = new Invite();
newInvite.email = emailMessage.email;
newInvite.message = emailMessage.message;
var invite = yield Invite.save();
// now try to send the email
var mailHTMl = html.replace( "{{EMAIL_PLACEHOLDER}}", req.body.registeredEmail);
var sendmail = new emailProvider.Email();
sendmail.setTos(emailMessage.email);
sendmail.setFrom(common.DEF_EMAIL_SENDER);
sendmail.setSubject(common.EMAIL_SUBJECT);
sendmail.setHtml(mailHTMl);
var successMail = yield emailProvider.send(sendmail);
emailStatuses.push({status : "SUCCESS", email : emailMessage.email});
}catch(err){
//additional logging here which ive removed for purposes of brevity
emailStatuses.push({status : "FAILURE", email : emailMessage.email});
}
}));
yield Promise.all(secondPromises);
return res.status(200).json(emailStatuses);
}
}).catch(function(err){
//additional logging here which ive removed for purposes of brevity
return res.status(500)
});