如何在流的末尾退出 nodejs,以便将最后的值保存到 db,并正确记录
How to exit nodejs at the end of the stream, so last values are saved to db, and log properly
我有一个 baconjs
流,用于保存带有 mongoose
的文章。这是代码:
function main() {
db.once('open', function(callback) {
console.log('db connection successs');
console.log('stremaing page ' + page);
var stream = readSite(page);
stream.onValue(function(article) {
try {
article = toArticleModel(article);
} catch (e) {
console.error(e);
}
article.save(function(err, article) {
if (err) {
console.error('Save Error: ' + err);
} else {
console.log('saved ' + article.publishedDate + " " + article.author.name + " " + article.title);
}
});
stream.onError(function(err) {
console.error('Stream Error: ' + err);
});
stream.onEnd(function() {
console.log('stream ended closing in 15 seconds..');
setTimeout(function() {
db.close();
}, 15 * 1000);
});
});
});
流结束后 onEnd
,我想关闭数据库连接并退出 nodejs 程序。我想一旦流结束,我应该等待一段时间,以便将最新值保存到数据库中。所以我用setTimeout
15秒,然后用db.close
。
问题是,日志 stream ended closing in 15 seconds..
在 stdout 中记录了 50-60 次,为什么?这是退出此类程序的好方法吗?
您正在 stream.onValue
的 中添加 stream.onEnd
侦听器 。这意味着您在每次触发 onValue
时添加侦听器。您需要在其他事件侦听器之外添加事件侦听器,以便它们只添加一次。
有多种方法可以处理异步迭代,以确保在关闭数据库之前保存所有文章值(您实际上可以跳过这一步)。
// initialize savingArticles to 0
savingArticles++;
article.save(function (err, article) {
savingArticles--
if (streamEnded && !savingArticles)
db.close();
stream.onEnd(function () {
if (!savingArticles)
db.close();
else
streamEnded = true;
如果当前未保存文章且流已结束,这将关闭数据库。如果当前正在保存文章时流结束,则应在保存该文章后关闭数据库。
我会尝试为此使用 .fromNodeCallback
和 .flatMap
:
stream = stream.flatMap(function(article) {
try {
article = toArticleModel(article);
} catch (e) {
return new Bacon.Error('toArticleModel error: ' + err);
}
return Bacon.fromNodeCallback(function(callback) {
article.save(function(err, article) {
if (err) {
callback('Save Error: ' + err);
} else {
callback('saved ' + article.publishedDate + " " + article.author.name + " " + article.title);
}
});
});
});
stream.onError(console.error.bind(console));
stream.onValue(console.log.bind(console));
stream.onEnd(db.close.bind(db));
还可以进一步简化:
stream = stream.flatMap(function(article) {
try {
article = toArticleModel(article);
} catch (e) {
return new Bacon.Error(err);
}
return Bacon.fromNodeCallback(article, "save")
.map(function() {
return "saved " + article.publishedDate + " " +
article.author.name + " " + article.title;
});
});
stream.log();
stream.onEnd(db.close.bind(db));
我有一个 baconjs
流,用于保存带有 mongoose
的文章。这是代码:
function main() {
db.once('open', function(callback) {
console.log('db connection successs');
console.log('stremaing page ' + page);
var stream = readSite(page);
stream.onValue(function(article) {
try {
article = toArticleModel(article);
} catch (e) {
console.error(e);
}
article.save(function(err, article) {
if (err) {
console.error('Save Error: ' + err);
} else {
console.log('saved ' + article.publishedDate + " " + article.author.name + " " + article.title);
}
});
stream.onError(function(err) {
console.error('Stream Error: ' + err);
});
stream.onEnd(function() {
console.log('stream ended closing in 15 seconds..');
setTimeout(function() {
db.close();
}, 15 * 1000);
});
});
});
流结束后 onEnd
,我想关闭数据库连接并退出 nodejs 程序。我想一旦流结束,我应该等待一段时间,以便将最新值保存到数据库中。所以我用setTimeout
15秒,然后用db.close
。
问题是,日志 stream ended closing in 15 seconds..
在 stdout 中记录了 50-60 次,为什么?这是退出此类程序的好方法吗?
您正在 stream.onValue
的 中添加 stream.onEnd
侦听器 。这意味着您在每次触发 onValue
时添加侦听器。您需要在其他事件侦听器之外添加事件侦听器,以便它们只添加一次。
有多种方法可以处理异步迭代,以确保在关闭数据库之前保存所有文章值(您实际上可以跳过这一步)。
// initialize savingArticles to 0
savingArticles++;
article.save(function (err, article) {
savingArticles--
if (streamEnded && !savingArticles)
db.close();
stream.onEnd(function () {
if (!savingArticles)
db.close();
else
streamEnded = true;
如果当前未保存文章且流已结束,这将关闭数据库。如果当前正在保存文章时流结束,则应在保存该文章后关闭数据库。
我会尝试为此使用 .fromNodeCallback
和 .flatMap
:
stream = stream.flatMap(function(article) {
try {
article = toArticleModel(article);
} catch (e) {
return new Bacon.Error('toArticleModel error: ' + err);
}
return Bacon.fromNodeCallback(function(callback) {
article.save(function(err, article) {
if (err) {
callback('Save Error: ' + err);
} else {
callback('saved ' + article.publishedDate + " " + article.author.name + " " + article.title);
}
});
});
});
stream.onError(console.error.bind(console));
stream.onValue(console.log.bind(console));
stream.onEnd(db.close.bind(db));
还可以进一步简化:
stream = stream.flatMap(function(article) {
try {
article = toArticleModel(article);
} catch (e) {
return new Bacon.Error(err);
}
return Bacon.fromNodeCallback(article, "save")
.map(function() {
return "saved " + article.publishedDate + " " +
article.author.name + " " + article.title;
});
});
stream.log();
stream.onEnd(db.close.bind(db));