将数据管道传输到尚未准备好接收数据的可写流
Piping data to writable stream that is not ready to receive data yet
有没有办法将可读流连接到 Node.js 中的可写流,其中可写流尚未准备好接收数据?换句话说,我想将可读与可写联系起来,但我想在程序的稍后时间初始化可写,包括定义写入方法。也许我们必须实现 write 方法,但是有没有一种方法可以像暂停可读流那样暂停可写流?或者也许我们可以使用中间 through/transform 流并在那里缓冲数据,然后再将数据通过管道传输到可写对象!
举个例子,通常我们这样做:
readable.pipe(transform).pipe(writable);
但我想做类似的事情:
const tstrm = readable.pipe(transform);
doSomethingAsync().then(function(){
tstrm.pipe(writable);
});
只是想知道这是否可能以及如何正确地做到这一点,到目前为止都很难搞清楚。
我想我希望在中间转换流中缓冲数据,然后再将其 connected/piped 写入可写流,然后,一旦连接,在任何新数据之前首先流式传输缓冲数据。似乎是合理的做法,找不到任何相关信息。
注意,我在这里使用间隔来模拟作者是否能够阅读。您可以按照您想要的任何方式执行此操作,即如果 writer returns false 您将更新状态以开始缓冲等。我认为最后一行是您想要的,即
r.pipe(b).pipe(w);
内容如下
readStrem.pipe(transformBbuffer).pipe(writeStream);
示例代码,我们可以进行一些更改以缓冲所有数据。我将在代码后描述。您需要了解的有关流的所有信息以及更多内容都在文档中,我认为他们可以使用更完整的示例来完成,但它们仍然非常好...
https://nodejs.org/api/stream.html#stream_class_stream_transform_1
这是代码。
var fs = require('fs');
var stream = require('stream')
const util = require('util');
//const StringDecoder = require('string_decoder').StringDecoder;
const Transform = require('stream').Transform;
var check_buff = 0;
var DRAIN_ME = 0;
var r = fs.createReadStream('file1.txt').setEncoding('utf8');
var w = fs.createWriteStream('file2.txt');
var BufferStream = function () {
stream.Transform.apply(this, arguments);
this.buffer = [];
};
util.inherits(BufferStream, stream.Transform);
var intId;
intId = setInterval(function(){
if(check_buff % 3 == 0) {
DRAIN_ME = 1;
return;
}
DRAIN_ME = 0;
},10);
BufferStream.prototype._transform = function (chunk, encoding, done) {
this.buffer.push(String(chunk));
while(DRAIN_ME > 0 && this.buffer.length > 0) {
this.push(this.buffer.shift());
}
console.log(chunk.length);
console.log(this.buffer.length);
done();
};
var b = new BufferStream();
b.on('end', function(chunk) {
clearInterval(intId);
});
r.pipe(b).pipe(w);
I am looking for the canonical way to implement a transform/through
stream, that buffers all data until pipe is call on it.
进行以下更改
BufferStream.prototype._transform = function (chunk, encoding, done) {
this.buffer.push(String(chunk));
console.log(chunk.length);
console.log(this.buffer.length);
done();
};
......
BufferStream.prototype._flush = function (cb) {
var len = this.buffer.length;
for (var i = 0; i < len; i++) {
this.push(this.buffer.shift());
};
cb();
};
您还可以暂停可读流,这实际上会暂停可写流,因为它停止接收数据,即...
要对此进行测试,请在磁盘上创建一个相当大的文件,即 100MB 或更大,并且 运行 这个...
var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');
readableStream.setEncoding('utf8');
readableStream.on('data', function(chunk) {
var ready = 0;
readableStream.pause();
setInterval(function(){
if(ready == 0) {
//console.log('pausing');
readableStream.pause();
ready = 1;
}
else {
//console.log('resuming');
readableStream.resume();
ready = 0;
}
},100);
writableStream.write(chunk);
});
立即暂停的原因是因为在间隔触发时 10ms
文件可能已经写入。这个有变化,即...
var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');
readableStream.setEncoding('utf8');
var ready = 0;
setInterval(function(){
if(ready == 0) {
//console.log('pausing');
readableStream.pause();
ready = 1;
}
else {
//console.log('resuming');
readableStream.resume();
ready = 0;
}
},100);
readableStream.on('data', function(chunk) {
writableStream.write(chunk);
readableStream.pause();
});
有没有办法将可读流连接到 Node.js 中的可写流,其中可写流尚未准备好接收数据?换句话说,我想将可读与可写联系起来,但我想在程序的稍后时间初始化可写,包括定义写入方法。也许我们必须实现 write 方法,但是有没有一种方法可以像暂停可读流那样暂停可写流?或者也许我们可以使用中间 through/transform 流并在那里缓冲数据,然后再将数据通过管道传输到可写对象!
举个例子,通常我们这样做:
readable.pipe(transform).pipe(writable);
但我想做类似的事情:
const tstrm = readable.pipe(transform);
doSomethingAsync().then(function(){
tstrm.pipe(writable);
});
只是想知道这是否可能以及如何正确地做到这一点,到目前为止都很难搞清楚。
我想我希望在中间转换流中缓冲数据,然后再将其 connected/piped 写入可写流,然后,一旦连接,在任何新数据之前首先流式传输缓冲数据。似乎是合理的做法,找不到任何相关信息。
注意,我在这里使用间隔来模拟作者是否能够阅读。您可以按照您想要的任何方式执行此操作,即如果 writer returns false 您将更新状态以开始缓冲等。我认为最后一行是您想要的,即
r.pipe(b).pipe(w);
内容如下
readStrem.pipe(transformBbuffer).pipe(writeStream);
示例代码,我们可以进行一些更改以缓冲所有数据。我将在代码后描述。您需要了解的有关流的所有信息以及更多内容都在文档中,我认为他们可以使用更完整的示例来完成,但它们仍然非常好...
https://nodejs.org/api/stream.html#stream_class_stream_transform_1
这是代码。
var fs = require('fs');
var stream = require('stream')
const util = require('util');
//const StringDecoder = require('string_decoder').StringDecoder;
const Transform = require('stream').Transform;
var check_buff = 0;
var DRAIN_ME = 0;
var r = fs.createReadStream('file1.txt').setEncoding('utf8');
var w = fs.createWriteStream('file2.txt');
var BufferStream = function () {
stream.Transform.apply(this, arguments);
this.buffer = [];
};
util.inherits(BufferStream, stream.Transform);
var intId;
intId = setInterval(function(){
if(check_buff % 3 == 0) {
DRAIN_ME = 1;
return;
}
DRAIN_ME = 0;
},10);
BufferStream.prototype._transform = function (chunk, encoding, done) {
this.buffer.push(String(chunk));
while(DRAIN_ME > 0 && this.buffer.length > 0) {
this.push(this.buffer.shift());
}
console.log(chunk.length);
console.log(this.buffer.length);
done();
};
var b = new BufferStream();
b.on('end', function(chunk) {
clearInterval(intId);
});
r.pipe(b).pipe(w);
I am looking for the canonical way to implement a transform/through stream, that buffers all data until pipe is call on it.
进行以下更改
BufferStream.prototype._transform = function (chunk, encoding, done) {
this.buffer.push(String(chunk));
console.log(chunk.length);
console.log(this.buffer.length);
done();
};
......
BufferStream.prototype._flush = function (cb) {
var len = this.buffer.length;
for (var i = 0; i < len; i++) {
this.push(this.buffer.shift());
};
cb();
};
您还可以暂停可读流,这实际上会暂停可写流,因为它停止接收数据,即...
要对此进行测试,请在磁盘上创建一个相当大的文件,即 100MB 或更大,并且 运行 这个...
var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');
readableStream.setEncoding('utf8');
readableStream.on('data', function(chunk) {
var ready = 0;
readableStream.pause();
setInterval(function(){
if(ready == 0) {
//console.log('pausing');
readableStream.pause();
ready = 1;
}
else {
//console.log('resuming');
readableStream.resume();
ready = 0;
}
},100);
writableStream.write(chunk);
});
立即暂停的原因是因为在间隔触发时 10ms
文件可能已经写入。这个有变化,即...
var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');
readableStream.setEncoding('utf8');
var ready = 0;
setInterval(function(){
if(ready == 0) {
//console.log('pausing');
readableStream.pause();
ready = 1;
}
else {
//console.log('resuming');
readableStream.resume();
ready = 0;
}
},100);
readableStream.on('data', function(chunk) {
writableStream.write(chunk);
readableStream.pause();
});