如何在流的末尾退出 nodejs,以便将最后的值保存到 db,并正确记录

How to exit nodejs at the end of the stream, so last values are saved to db, and log properly

我有一个 baconjs 流,用于保存带有 mongoose 的文章。这是代码:

function main() {
  db.once('open', function(callback) {
    console.log('db connection successs');
    console.log('stremaing page ' + page);

    var stream = readSite(page);
    stream.onValue(function(article) {
      try {
        article = toArticleModel(article);
      } catch (e) {
        console.error(e);
      }
      article.save(function(err, article) {
        if (err) {
          console.error('Save Error: ' + err);
        } else {
          console.log('saved ' + article.publishedDate + " " + article.author.name + " " + article.title);
        }
      });

      stream.onError(function(err) {
        console.error('Stream Error: ' + err);
      });

      stream.onEnd(function() {
        console.log('stream ended closing in 15 seconds..');
        setTimeout(function() {
          db.close();
        }, 15 * 1000);
      });
   });
 });

流结束后 onEnd,我想关闭数据库连接并退出 nodejs 程序。我想一旦流结束,我应该等待一段时间,以便将最新值保存到数据库中。所以我用setTimeout15秒,然后用db.close

问题是,日志 stream ended closing in 15 seconds.. 在 stdout 中记录了 50-60 次,为什么?这是退出此类程序的好方法吗?

您正在 stream.onValue 的 中添加 stream.onEnd 侦听器 。这意味着您在每次触发 onValue 时添加侦听器。您需要在其他事件侦听器之外添加事件侦听器,以便它们只添加一次。

有多种方法可以处理异步迭代,以确保在关闭数据库之前保存所有文章值(您实际上可以跳过这一步)。

// initialize savingArticles to 0

savingArticles++;
article.save(function (err, article) {
  savingArticles--
  if (streamEnded && !savingArticles)
    db.close();

stream.onEnd(function () {
  if (!savingArticles)
    db.close();
  else
    streamEnded = true;

如果当前未保存文章且流已结束,这将关闭数据库。如果当前正在保存文章时流结束,则应在保存该文章后关闭数据库。

我会尝试为此使用 .fromNodeCallback.flatMap

stream = stream.flatMap(function(article) {

  try {
    article = toArticleModel(article);
  } catch (e) {
    return new Bacon.Error('toArticleModel error: ' + err);
  }

  return Bacon.fromNodeCallback(function(callback) {
    article.save(function(err, article) {
      if (err) {
        callback('Save Error: ' + err);
      } else {
        callback('saved ' + article.publishedDate + " " + article.author.name + " " + article.title);
      }
    });
  });

});

stream.onError(console.error.bind(console));
stream.onValue(console.log.bind(console));
stream.onEnd(db.close.bind(db));

还可以进一步简化:

stream = stream.flatMap(function(article) {

  try {
    article = toArticleModel(article);
  } catch (e) {
    return new Bacon.Error(err);
  }

  return Bacon.fromNodeCallback(article, "save")
    .map(function() {
      return "saved " + article.publishedDate + " " + 
        article.author.name + " " + article.title;
    });

});

stream.log();
stream.onEnd(db.close.bind(db));