将数据放回可读流

Putting data back onto a Readable stream

TL;DR 我如何从流中读取一些数据然后将其放回原处以允许其他消费者获得相同的 data 事件?

这是一个流式传输 1...Infinity 的可读流:

var Readable = require('stream').Readable;

var readable = new Readable();

var c = 0;

readable._read = function () {

    var self = this;

    setTimeout(function () {
        self.push((++c).toString());
    }, 500);
};

我想读取第一个 data 事件,查看数据,然后 "reset" 将流恢复到其原始状态并允许其他另一个 data 侦听器使用第一个事件就好像它从未发生过一样。我认为 unshift() 是文档中所说的正确方法:

readable.unshift(chunk)#

chunk Buffer | String Chunk of data to unshift onto the read queue This is useful in certain cases where a stream is being consumed by a parser, which needs to "un-consume" some data that it has optimistically pulled out of the source, so that the stream can be passed on to some other party.

这听起来很适合我的需要,但它并不像我期望的那样有效:

...

readable.once('data', function (d) {

    console.log(d.toString());              // Outputs 1

    readable.unshift(d);                    // Put the 1 back on the stream

    readable.on('data', function (d) {
        console.log(d.toString());          // Heh?! Outputs 2, how about 1?
    });

});

所以我想出了答案:

当您调用 stream.unshift() 时,如果流处于流动模式,if 将立即发出数据事件。所以当我在示例中添加监听器时,船已经起航了。

readable.unshift(d);                  // emits 'data' event

readable.on('data', function (d) {    // missed `data` event
    console.log(d.toString());
});

有几种方法可以让它按我的预期工作:

1) 在取消移位之前添加新的监听器:

readable.once('data', function (d) {

    console.log(d.toString());              // Outputs 1

    readable.on('data', function (d) {
        console.log(d.toString());          // Outputs 1,1,2,3...
    });

    readable.unshift(d);                    // Put the 1 back on the stream

});

2) 暂停和恢复流:

readable.once('data', function (d) {

    console.log(d.toString());              // Outputs 1
    readable.pause();                       // Stops the stream from flowing
    readable.unshift(d);                    // Put the 1 back on the stream

    readable.on('data', function (d) {
        console.log(d.toString());          // Outputs 1,1,2,3...
    });

    readable.resume();                      // Start the stream flowing again

});