Node.js 流可读转换
Node.js Streams Readable to Transform
我一直在尝试使用可读和转换流来处理非常大的文件。我似乎遇到的问题是,如果我不在末尾放置可写流,程序似乎会在返回结果之前终止。
示例:rstream.pipe(split()).pipe(tstream)
我的 tstream
有一个发射器,当计数器达到阈值时发射。当该阈值设置为较低的数字时,我会得到一个结果,但当它很高时,它不会返回任何内容。如果我将它通过管道传输到文件编写器,它总是 returns 结果。我是否遗漏了一些明显的东西?
代码:
// Dependencies
var fs = require('fs');
var rstream = fs.createReadStream('file');
var wstream = fs.createWriteStream('output');
var split = require('split'); // used for separating stream by new line
var QTransformStream = require('./transform');
var qtransformstream = new QTransformStream();
qtransformstream.on('completed', function(result) {
console.log('Result: ' + result);
});
exports.getQ = function getQ(filename, callback) {
// THIS WORKS if i have a low counter for qtransformstream,
// but when it's high, I do not get a result
// rstream.pipe(split()).pipe(qtransformstream);
// this always works
rstream.pipe(split()).pipe(qtransformstream).pipe(wstream);
};
这是 Qtransformstream
的代码
// Dependencies
var Transform = require('stream').Transform,
util = require('util');
// Constructor, takes in the Quser as an input
var TransformStream = function(Quser) {
// Create this as a Transform Stream
Transform.call(this, {
objectMode: true
});
// Default the Qbase to 32 as an assumption
this.Qbase = 32;
if (Quser) {
this.Quser = Quser;
} else {
this.Quser = 20;
}
this.Qpass = this.Quser + this.Qbase;
this.Counter = 0;
// Variables used as intermediates
this.Qmin = 120;
this.Qmax = 0;
};
// Extend the transform object
util.inherits(TransformStream, Transform);
// The Transformation to get the Qbase and Qpass
TransformStream.prototype._transform = function(chunk, encoding, callback) {
var Qmin = this.Qmin;
var Qmax = this.Qmax;
var Qbase = this.Qbase;
var Quser = this.Quser;
this.Counter++;
// Stop the stream after 100 reads and emit the data
if (this.Counter === 100) {
this.emit('completed', this.Qbase, this.Quser);
}
// do some calcs on this.Qbase
this.push('something not important');
callback();
};
// export the object
module.exports = TransformStream;
编辑:
此外,我不知道你的计数器有多高,但如果你填满了缓冲区,它将停止将数据传递到转换流,在这种情况下 completed
永远不会真正命中,因为你永远不会到达计数器限制。尝试更改您的 highwatermark
.
编辑 2:更好的解释
如您所知,transform stream
是双工流,这基本上意味着它可以从源接受数据,并且可以将数据发送到目的地。这通常分别称为读和写。 transform stream
继承自 read stream
和 Node.js 实现的 write stream
。不过有一点需要注意,transform stream
不必实现 _read 或 _write 函数。 从这个意义上说,您可以将其视为鲜为人知的 passthrough stream.
如果您考虑 transform stream
实现 write stream
这一事实,您还必须考虑这样一个事实,即写入流始终有一个转储其内容的目的地。您遇到的问题 是当您创建transform stream
时您无法指定一个地方来发送您的内容。 通过转换流完全传递数据的唯一方法是将其通过管道传输到写入流,否则,实质上您的流会备份并且无法接受更多数据,因为数据没有位置去。
这就是为什么当您通过管道传输到写入流时它始终有效。写入流通过将数据发送到目的地来减轻数据备份,因此您的所有数据都将通过管道传输并发出完成事件。
当样本量较小时,您的代码在没有写入流的情况下工作的原因是您没有填充流,因此转换流可以接受足够的数据以允许完整的 event/threshold被击中。随着阈值的增加,您的流在不将其发送到另一个地方(写入流)的情况下可以接受的数据量保持不变。这会导致您的流备份,并且它不能再接受数据,这意味着将永远不会发出已完成的事件。
我敢说,如果您增加转换流的 highwatermark
,您将能够增加您的阈值并且代码仍然有效。但是这种方法是不正确的。将您的流通过管道传输到写入流,该写入流会将数据发送到 dev/null 创建该写入流的方法是:
var writer = fs.createWriteStream('/dev/null');
Node.js 文档中有关 buffering 的部分解释了您 运行 遇到的错误。
您不中断 _transform 并且进程会走得很远。尝试:
this.emit('completed', ...);
this.end();
这就是为什么 'program seems to terminate before the result gets returned'
并且不要输出任何无用的数据:
var wstream = fs.createWriteStream('/dev/null');
祝你好运)
我建议使用 Writable 而不是转换流。
然后将 _transform
重命名为 _write
并且如果您通过管道传输到它,您的代码将使用该流。正如@Bradgnar 已经指出的那样,转换流需要一个消费者,否则它将 stop the readable 将更多数据推送到其缓冲区。
我一直在尝试使用可读和转换流来处理非常大的文件。我似乎遇到的问题是,如果我不在末尾放置可写流,程序似乎会在返回结果之前终止。
示例:rstream.pipe(split()).pipe(tstream)
我的 tstream
有一个发射器,当计数器达到阈值时发射。当该阈值设置为较低的数字时,我会得到一个结果,但当它很高时,它不会返回任何内容。如果我将它通过管道传输到文件编写器,它总是 returns 结果。我是否遗漏了一些明显的东西?
代码:
// Dependencies
var fs = require('fs');
var rstream = fs.createReadStream('file');
var wstream = fs.createWriteStream('output');
var split = require('split'); // used for separating stream by new line
var QTransformStream = require('./transform');
var qtransformstream = new QTransformStream();
qtransformstream.on('completed', function(result) {
console.log('Result: ' + result);
});
exports.getQ = function getQ(filename, callback) {
// THIS WORKS if i have a low counter for qtransformstream,
// but when it's high, I do not get a result
// rstream.pipe(split()).pipe(qtransformstream);
// this always works
rstream.pipe(split()).pipe(qtransformstream).pipe(wstream);
};
这是 Qtransformstream
// Dependencies
var Transform = require('stream').Transform,
util = require('util');
// Constructor, takes in the Quser as an input
var TransformStream = function(Quser) {
// Create this as a Transform Stream
Transform.call(this, {
objectMode: true
});
// Default the Qbase to 32 as an assumption
this.Qbase = 32;
if (Quser) {
this.Quser = Quser;
} else {
this.Quser = 20;
}
this.Qpass = this.Quser + this.Qbase;
this.Counter = 0;
// Variables used as intermediates
this.Qmin = 120;
this.Qmax = 0;
};
// Extend the transform object
util.inherits(TransformStream, Transform);
// The Transformation to get the Qbase and Qpass
TransformStream.prototype._transform = function(chunk, encoding, callback) {
var Qmin = this.Qmin;
var Qmax = this.Qmax;
var Qbase = this.Qbase;
var Quser = this.Quser;
this.Counter++;
// Stop the stream after 100 reads and emit the data
if (this.Counter === 100) {
this.emit('completed', this.Qbase, this.Quser);
}
// do some calcs on this.Qbase
this.push('something not important');
callback();
};
// export the object
module.exports = TransformStream;
编辑:
此外,我不知道你的计数器有多高,但如果你填满了缓冲区,它将停止将数据传递到转换流,在这种情况下 completed
永远不会真正命中,因为你永远不会到达计数器限制。尝试更改您的 highwatermark
.
编辑 2:更好的解释
如您所知,transform stream
是双工流,这基本上意味着它可以从源接受数据,并且可以将数据发送到目的地。这通常分别称为读和写。 transform stream
继承自 read stream
和 Node.js 实现的 write stream
。不过有一点需要注意,transform stream
不必实现 _read 或 _write 函数。 从这个意义上说,您可以将其视为鲜为人知的 passthrough stream.
如果您考虑 transform stream
实现 write stream
这一事实,您还必须考虑这样一个事实,即写入流始终有一个转储其内容的目的地。您遇到的问题 是当您创建transform stream
时您无法指定一个地方来发送您的内容。 通过转换流完全传递数据的唯一方法是将其通过管道传输到写入流,否则,实质上您的流会备份并且无法接受更多数据,因为数据没有位置去。
这就是为什么当您通过管道传输到写入流时它始终有效。写入流通过将数据发送到目的地来减轻数据备份,因此您的所有数据都将通过管道传输并发出完成事件。
当样本量较小时,您的代码在没有写入流的情况下工作的原因是您没有填充流,因此转换流可以接受足够的数据以允许完整的 event/threshold被击中。随着阈值的增加,您的流在不将其发送到另一个地方(写入流)的情况下可以接受的数据量保持不变。这会导致您的流备份,并且它不能再接受数据,这意味着将永远不会发出已完成的事件。
我敢说,如果您增加转换流的 highwatermark
,您将能够增加您的阈值并且代码仍然有效。但是这种方法是不正确的。将您的流通过管道传输到写入流,该写入流会将数据发送到 dev/null 创建该写入流的方法是:
var writer = fs.createWriteStream('/dev/null');
Node.js 文档中有关 buffering 的部分解释了您 运行 遇到的错误。
您不中断 _transform 并且进程会走得很远。尝试:
this.emit('completed', ...);
this.end();
这就是为什么 'program seems to terminate before the result gets returned'
并且不要输出任何无用的数据:
var wstream = fs.createWriteStream('/dev/null');
祝你好运)
我建议使用 Writable 而不是转换流。
然后将 _transform
重命名为 _write
并且如果您通过管道传输到它,您的代码将使用该流。正如@Bradgnar 已经指出的那样,转换流需要一个消费者,否则它将 stop the readable 将更多数据推送到其缓冲区。