使用生成器和 Promise 的节点协程并行流控制
Node Coroutines Parallel Flow Control with Generators and Promise
我正在尝试使用协程和承诺来模仿 async.js 库的控制流,同时使用 co
和 bluebird.js
但我 运行 遇到了一些问题.我的代码如下,虽然这主要是伪代码,因为实际代码会很长,如果需要我可以稍后添加实际代码...
co(function*(){
var re = /^\w+([\.-]?\w+)*@\w+([\.-]?\w+)*(\.\w{2,3})+$/;
var doc = yield People.findOne({email: req.body.email}).exec();
var filePath = path.join(__dirname, '../email-template.html');
var html = yield fs.readFileAsync(filePath,'utf8');
var emailsToSend = [];
var emailStatuses = [];
var validEmails = [];
//make sure email is ok
req.body.messagesToSend.forEach(function(message){
if(message.email != null && re.test(message.email))
{
validEmails.push(message);
}else{
// mark it as failed...
emailStatuses.push({success : "FAILURE", email : message.email});
}
});
yield Promise.all( validEmails, Promise.coroutine(function * (message){
try{
var person = yield People.findOne({email: message.email }).exec();
if(person){
emailStatuses.push({status : "Already exists", email : message.email});
}else{
emailsToSend.push({ email: message.email, message: message.text });
}
}// else
}catch(err){
emailStatuses.push({status : "FAILURE", email : message.email});
}//
}));
if( emailsToSend.length === 0){
// no valid emails to process so just return
return res.status(200).json(emailStatuses);
}// if no emails to send
else{
yield Promise.all(emailsToSend, Promise.coroutine(function * (emailMessage){
try{
var newInvite = new Invite();
newInvite.email = emailMessage.email;
newInvite.message = emailMessage.message;
var invite = yield Invite.save();
// now try to send the email
var mailHTMl = html.replace( "{{EMAIL_PLACEHOLDER}}", req.body.registeredEmail);
var sendmail = new emailProvider.Email();
sendmail.setTos(emailMessage.email);
sendmail.setFrom(common.DEF_EMAIL_SENDER);
sendmail.setSubject(common.EMAIL_SUBJECT);
sendmail.setHtml(mailHTMl);
var successMail = yield emailProvider.send(sendmail);
emailStatuses.push({status : "SUCCESS", email : emailMessage.email});
}catch(err){
//additional logging here which ive removed for purposes of brevity
emailStatuses.push({status : "FAILURE", email : emailMessage.email});
}
}));
return res.status(200).json(emailStatuses);
}
}).catch(function(err){
//additional logging here which ive removed for purposes of brevity
return res.status(500)
});
我遇到的问题是 Promise.all,如果我传入一个数组,它似乎只处理第一个元素,即使没有拒绝承诺或任何类型的错误。
如果我使用 Promise.each,这段代码可以工作,但它是串行执行的。我想要实现的基本上是有一个带有 2 async.foreach 的异步系列,它将一个接一个地执行并并行处理每个数组项,但按顺序处理每个数组,如下所示:
async.series([
async.foreach
async.foreach
]);
但是,我不确定为了让它并行执行我在这里遗漏了什么,因为如果我使用 Promise.each 并为每个数组项串行执行,它现在似乎工作正常.
所以基本上有2种方法可以实现,第一种解决方案是使用原始代码,只使用Promise.map,我不确定它是否并行执行,但基本上不会在第一个数组元素处停止。
第二个是将数组值映射到协程函数的相当简单的更改,然后对它们执行 Promise.all,如下所示:
不过,我必须指出,这明显比使用 async.js 慢。如果有人能解释为什么会有所帮助?
co(function*(){
var re = /^\w+([\.-]?\w+)*@\w+([\.-]?\w+)*(\.\w{2,3})+$/;
var doc = yield People.findOne({email: req.body.email}).exec();
var filePath = path.join(__dirname, '../email-template.html');
var html = yield fs.readFileAsync(filePath,'utf8');
var emailsToSend = [];
var emailStatuses = [];
var validEmails = [];
//make sure email is ok
req.body.messagesToSend.forEach(function(message){
if(message.email != null && re.test(message.email))
{
validEmails.push(message);
}else{
// mark it as failed...
emailStatuses.push({success : "FAILURE", email : message.email});
}
});
//yield Promise.all( validEmails, Promise.coroutine(function * (message){
var firstPromises = validEmails.map(Promise.coroutine(function * (message){
try{
var person = yield People.findOne({email: message.email }).exec();
if(person){
emailStatuses.push({status : "Already exists", email : message.email});
}else{
emailsToSend.push({ email: message.email, message: message.text });
}
}// else
}catch(err){
emailStatuses.push({status : "FAILURE", email : message.email});
}//
}));
yield Promise.all(firstPromises);
if( emailsToSend.length === 0){
// no valid emails to process so just return
return res.status(200).json(emailStatuses);
}// if no emails to send
else{
//yield Promise.all(emailsToSend, Promise.coroutine(function * (emailMessage){
var secondPromises = emailsToSend.map( Promise.coroutine(function * (emailMessage){
try{
var newInvite = new Invite();
newInvite.email = emailMessage.email;
newInvite.message = emailMessage.message;
var invite = yield Invite.save();
// now try to send the email
var mailHTMl = html.replace( "{{EMAIL_PLACEHOLDER}}", req.body.registeredEmail);
var sendmail = new emailProvider.Email();
sendmail.setTos(emailMessage.email);
sendmail.setFrom(common.DEF_EMAIL_SENDER);
sendmail.setSubject(common.EMAIL_SUBJECT);
sendmail.setHtml(mailHTMl);
var successMail = yield emailProvider.send(sendmail);
emailStatuses.push({status : "SUCCESS", email : emailMessage.email});
}catch(err){
//additional logging here which ive removed for purposes of brevity
emailStatuses.push({status : "FAILURE", email : emailMessage.email});
}
}));
yield Promise.all(secondPromises);
return res.status(200).json(emailStatuses);
}
}).catch(function(err){
//additional logging here which ive removed for purposes of brevity
return res.status(500)
});
我正在尝试使用协程和承诺来模仿 async.js 库的控制流,同时使用 co
和 bluebird.js
但我 运行 遇到了一些问题.我的代码如下,虽然这主要是伪代码,因为实际代码会很长,如果需要我可以稍后添加实际代码...
co(function*(){
var re = /^\w+([\.-]?\w+)*@\w+([\.-]?\w+)*(\.\w{2,3})+$/;
var doc = yield People.findOne({email: req.body.email}).exec();
var filePath = path.join(__dirname, '../email-template.html');
var html = yield fs.readFileAsync(filePath,'utf8');
var emailsToSend = [];
var emailStatuses = [];
var validEmails = [];
//make sure email is ok
req.body.messagesToSend.forEach(function(message){
if(message.email != null && re.test(message.email))
{
validEmails.push(message);
}else{
// mark it as failed...
emailStatuses.push({success : "FAILURE", email : message.email});
}
});
yield Promise.all( validEmails, Promise.coroutine(function * (message){
try{
var person = yield People.findOne({email: message.email }).exec();
if(person){
emailStatuses.push({status : "Already exists", email : message.email});
}else{
emailsToSend.push({ email: message.email, message: message.text });
}
}// else
}catch(err){
emailStatuses.push({status : "FAILURE", email : message.email});
}//
}));
if( emailsToSend.length === 0){
// no valid emails to process so just return
return res.status(200).json(emailStatuses);
}// if no emails to send
else{
yield Promise.all(emailsToSend, Promise.coroutine(function * (emailMessage){
try{
var newInvite = new Invite();
newInvite.email = emailMessage.email;
newInvite.message = emailMessage.message;
var invite = yield Invite.save();
// now try to send the email
var mailHTMl = html.replace( "{{EMAIL_PLACEHOLDER}}", req.body.registeredEmail);
var sendmail = new emailProvider.Email();
sendmail.setTos(emailMessage.email);
sendmail.setFrom(common.DEF_EMAIL_SENDER);
sendmail.setSubject(common.EMAIL_SUBJECT);
sendmail.setHtml(mailHTMl);
var successMail = yield emailProvider.send(sendmail);
emailStatuses.push({status : "SUCCESS", email : emailMessage.email});
}catch(err){
//additional logging here which ive removed for purposes of brevity
emailStatuses.push({status : "FAILURE", email : emailMessage.email});
}
}));
return res.status(200).json(emailStatuses);
}
}).catch(function(err){
//additional logging here which ive removed for purposes of brevity
return res.status(500)
});
我遇到的问题是 Promise.all,如果我传入一个数组,它似乎只处理第一个元素,即使没有拒绝承诺或任何类型的错误。
如果我使用 Promise.each,这段代码可以工作,但它是串行执行的。我想要实现的基本上是有一个带有 2 async.foreach 的异步系列,它将一个接一个地执行并并行处理每个数组项,但按顺序处理每个数组,如下所示:
async.series([
async.foreach
async.foreach
]);
但是,我不确定为了让它并行执行我在这里遗漏了什么,因为如果我使用 Promise.each 并为每个数组项串行执行,它现在似乎工作正常.
所以基本上有2种方法可以实现,第一种解决方案是使用原始代码,只使用Promise.map,我不确定它是否并行执行,但基本上不会在第一个数组元素处停止。
第二个是将数组值映射到协程函数的相当简单的更改,然后对它们执行 Promise.all,如下所示:
不过,我必须指出,这明显比使用 async.js 慢。如果有人能解释为什么会有所帮助?
co(function*(){
var re = /^\w+([\.-]?\w+)*@\w+([\.-]?\w+)*(\.\w{2,3})+$/;
var doc = yield People.findOne({email: req.body.email}).exec();
var filePath = path.join(__dirname, '../email-template.html');
var html = yield fs.readFileAsync(filePath,'utf8');
var emailsToSend = [];
var emailStatuses = [];
var validEmails = [];
//make sure email is ok
req.body.messagesToSend.forEach(function(message){
if(message.email != null && re.test(message.email))
{
validEmails.push(message);
}else{
// mark it as failed...
emailStatuses.push({success : "FAILURE", email : message.email});
}
});
//yield Promise.all( validEmails, Promise.coroutine(function * (message){
var firstPromises = validEmails.map(Promise.coroutine(function * (message){
try{
var person = yield People.findOne({email: message.email }).exec();
if(person){
emailStatuses.push({status : "Already exists", email : message.email});
}else{
emailsToSend.push({ email: message.email, message: message.text });
}
}// else
}catch(err){
emailStatuses.push({status : "FAILURE", email : message.email});
}//
}));
yield Promise.all(firstPromises);
if( emailsToSend.length === 0){
// no valid emails to process so just return
return res.status(200).json(emailStatuses);
}// if no emails to send
else{
//yield Promise.all(emailsToSend, Promise.coroutine(function * (emailMessage){
var secondPromises = emailsToSend.map( Promise.coroutine(function * (emailMessage){
try{
var newInvite = new Invite();
newInvite.email = emailMessage.email;
newInvite.message = emailMessage.message;
var invite = yield Invite.save();
// now try to send the email
var mailHTMl = html.replace( "{{EMAIL_PLACEHOLDER}}", req.body.registeredEmail);
var sendmail = new emailProvider.Email();
sendmail.setTos(emailMessage.email);
sendmail.setFrom(common.DEF_EMAIL_SENDER);
sendmail.setSubject(common.EMAIL_SUBJECT);
sendmail.setHtml(mailHTMl);
var successMail = yield emailProvider.send(sendmail);
emailStatuses.push({status : "SUCCESS", email : emailMessage.email});
}catch(err){
//additional logging here which ive removed for purposes of brevity
emailStatuses.push({status : "FAILURE", email : emailMessage.email});
}
}));
yield Promise.all(secondPromises);
return res.status(200).json(emailStatuses);
}
}).catch(function(err){
//additional logging here which ive removed for purposes of brevity
return res.status(500)
});