将数据放回可读流
Putting data back onto a Readable stream
TL;DR 我如何从流中读取一些数据然后将其放回原处以允许其他消费者获得相同的 data
事件?
这是一个流式传输 1...Infinity 的可读流:
var Readable = require('stream').Readable;
var readable = new Readable();
var c = 0;
readable._read = function () {
var self = this;
setTimeout(function () {
self.push((++c).toString());
}, 500);
};
我想读取第一个 data
事件,查看数据,然后 "reset" 将流恢复到其原始状态并允许其他另一个 data
侦听器使用第一个事件就好像它从未发生过一样。我认为 unshift()
是文档中所说的正确方法:
readable.unshift(chunk)#
chunk Buffer | String Chunk of data to unshift onto the read queue
This is useful in certain cases where a stream is being consumed by a
parser, which needs to "un-consume" some data that it has
optimistically pulled out of the source, so that the stream can be
passed on to some other party.
这听起来很适合我的需要,但它并不像我期望的那样有效:
...
readable.once('data', function (d) {
console.log(d.toString()); // Outputs 1
readable.unshift(d); // Put the 1 back on the stream
readable.on('data', function (d) {
console.log(d.toString()); // Heh?! Outputs 2, how about 1?
});
});
所以我想出了答案:
当您调用 stream.unshift()
时,如果流处于流动模式,if 将立即发出数据事件。所以当我在示例中添加监听器时,船已经起航了。
readable.unshift(d); // emits 'data' event
readable.on('data', function (d) { // missed `data` event
console.log(d.toString());
});
有几种方法可以让它按我的预期工作:
1) 在取消移位之前添加新的监听器:
readable.once('data', function (d) {
console.log(d.toString()); // Outputs 1
readable.on('data', function (d) {
console.log(d.toString()); // Outputs 1,1,2,3...
});
readable.unshift(d); // Put the 1 back on the stream
});
2) 暂停和恢复流:
readable.once('data', function (d) {
console.log(d.toString()); // Outputs 1
readable.pause(); // Stops the stream from flowing
readable.unshift(d); // Put the 1 back on the stream
readable.on('data', function (d) {
console.log(d.toString()); // Outputs 1,1,2,3...
});
readable.resume(); // Start the stream flowing again
});
TL;DR 我如何从流中读取一些数据然后将其放回原处以允许其他消费者获得相同的 data
事件?
这是一个流式传输 1...Infinity 的可读流:
var Readable = require('stream').Readable;
var readable = new Readable();
var c = 0;
readable._read = function () {
var self = this;
setTimeout(function () {
self.push((++c).toString());
}, 500);
};
我想读取第一个 data
事件,查看数据,然后 "reset" 将流恢复到其原始状态并允许其他另一个 data
侦听器使用第一个事件就好像它从未发生过一样。我认为 unshift()
是文档中所说的正确方法:
readable.unshift(chunk)#
chunk Buffer | String Chunk of data to unshift onto the read queue This is useful in certain cases where a stream is being consumed by a parser, which needs to "un-consume" some data that it has optimistically pulled out of the source, so that the stream can be passed on to some other party.
这听起来很适合我的需要,但它并不像我期望的那样有效:
...
readable.once('data', function (d) {
console.log(d.toString()); // Outputs 1
readable.unshift(d); // Put the 1 back on the stream
readable.on('data', function (d) {
console.log(d.toString()); // Heh?! Outputs 2, how about 1?
});
});
所以我想出了答案:
当您调用 stream.unshift()
时,如果流处于流动模式,if 将立即发出数据事件。所以当我在示例中添加监听器时,船已经起航了。
readable.unshift(d); // emits 'data' event
readable.on('data', function (d) { // missed `data` event
console.log(d.toString());
});
有几种方法可以让它按我的预期工作:
1) 在取消移位之前添加新的监听器:
readable.once('data', function (d) {
console.log(d.toString()); // Outputs 1
readable.on('data', function (d) {
console.log(d.toString()); // Outputs 1,1,2,3...
});
readable.unshift(d); // Put the 1 back on the stream
});
2) 暂停和恢复流:
readable.once('data', function (d) {
console.log(d.toString()); // Outputs 1
readable.pause(); // Stops the stream from flowing
readable.unshift(d); // Put the 1 back on the stream
readable.on('data', function (d) {
console.log(d.toString()); // Outputs 1,1,2,3...
});
readable.resume(); // Start the stream flowing again
});