NodeJS 流和过早结束

NodeJS streams and premature end

假设 NodeJS 中有一个可读流和一个与之关联的数据 (on('data', ...)) 事件处理程序相对较慢,是否有可能在最后一个数据处理程序完成之前触发 End 事件,如果是这样,它会提前终止该处理程序吗?或者,是否会调度所有数据事件并且 运行?

就我而言,我正在处理大文件并希望将每个数据块提交给数据库。我担心如果在处理程序中的最后一个数据库调用实际完成之前触发 End,我可能会丢失最后一个或两个(或更多)记录。

事件 'end' 在最后一个 'data' 事件之后触发。但它可能发生在最后一个数据处理程序完成之前。有可能在一个 'data' 处理程序完成之前,启动下一个。这取决于您的代码中包含的内容,但稍后对事件 'data' 的调用可能会早于完成。它可能会导致您的代码出现错误和问题。

如何导致问题的示例(对您自己的测试):

  var fs = require('fs');
  var rr = fs.createReadStream('somebigfile.jpg');
  var i=0;
  rr.on('data', function(chunk) {
    i++;
    var s = i;
    console.log('readable:' + s);
    setTimeout(function(){
      console.log('timeout:'+s);
    }, 50-i*10);
  });
  rr.on('end', function() {
    console.log('end');
  });

它会在启动每个 'data' 事件处理程序时打印在您的控制台中。并在几毫秒后完成。完成顺序可能不同。

解决方案:

可读流有两种模式 'flowing mode' 和一种 'paused mode'。当您添加 'data' 事件处理程序时,您会自动将可读流设置为流动模式。

来自 documentation

When in flowing mode, data is read from the underlying system and provided to your program as fast as possible

在此模式下,事件不会等待您的缓慢操作完成。您的需求是 'paused mode'.

来自文档:

In paused mode, you must explicitly call stream.read() to get chunks of data out. Streams start out in paused mode.

换句话说:您需要数据块,您得到它,使用它,当您准备好时,您会请求新的数据块。在此模式下,您可以控制何时获取数据。

如何改成'paused mode':

这是此流的默认模式。但是,当您注册 'data' 事件处理程序时,它会切换到 'flowing mode'。因此不使用 readstream.on('data',...) 而是在触发时使用 readstream.on('readable', function(){...}) ,这意味着流已准备好提供数据块。要获取数据块,请使用 var chunk = readstream.read();

来自文档的示例:

var fs = require('fs');
var rr = fs.createReadStream('foo.txt');
rr.on('readable', function() {
  console.log('readable:', rr.read());
});
rr.on('end', function() {
  console.log('end');
});

请阅读文档了解更多详情,因为当流自动切换到 'flowing mode' 时有更多可能性。

使用慢速处理程序和流动模式:

如果你want/need在'flowing mode'工作,也有解决办法。您可以暂停和恢复流。当您从读取流('data')获得块时,暂停流,并在完成工作后恢复它。

文档中的示例:

var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
  console.log('got %d bytes of data', chunk.length);
  readable.pause();
  console.log('there will be no more data for 1 second');
  setTimeout(function() {
    console.log('now data will start flowing again');
    readable.resume();
  }, 1000);
});