stream.Transform 在完成输出之前接受新的输入
stream.Transform accept new input before completing output
我正在使用异步操作实现转换流。我的名字叫 Parser
.
var Transform = require('stream').transform;
function Parser(options) {
Transform.call(this, {objectMode: true});
}
Parser.prototype._transform = function _transform(input, encoding, callback) {
var this_ = this;
doSomethingAsync(input, function(output) {
this_.push(output);
//possible location #1 for callback();
});
//possible location #2 for callback();
}
每个传入的块可能需要很长时间才能处理(doSomethingAsync
需要网络请求)。但是,每个块的处理完全独立于之前的块。此外,输出的确切顺序并不重要。每个输出都包含一个标识其输入的描述符,而不是按顺序标识。
因此,我希望 _transform
尽快再次被调用,而不是等到给定的块完全完成处理。所以,看看代码,如果我把 callback()
放在 possible location #1
中,那么 _transform
永远不会被调用,直到每个块都被完全处理。但是如果我把它放在 possible location #2
中,那么我的流在回调之后推送,这会导致这些难看的
Uncaught Error: stream.push() after EOF
流终止后出现错误。
所以我的问题是:是否可以使用转换流来做到这一点?或者我应该考虑使用图书馆吗?如果是,哪种类型(事件流、FRP 等...)?
谢谢。
您可以在您的流上实现 _flush()
,并且仅在所有异步函数完成时调用传递给该函数的回调。像这样:
function Parser(options) {
Transform.call(this, {objectMode: true});
this._pending = 0;
this._flushcb = undefined;
}
Parser.prototype._transform = function _transform(input, encoding, callback) {
var self = this;
++this._pending;
doSomethingAsync(input, function(output) {
self.push(output);
if (--self._pending === 0 && self._flushcb)
self._flushcb();
});
callback();
}
Parser.prototype._flush = function(callback) {
this._flushcb = callback;
};
我认为答案不完整。
假设你有一个像这样的 _transform()
:
_transform(chunk, encoding, done) {
let data = chunk.toString();
this.rest += data;
[this.toPush, this.rest] = this.f(this.rest);
for (let i = 0; i < this.toPush.length; i++) {
if (!this.push(this.toPush[i])) {
this._source.readStop();
break;
}
}
done()
}
```
其中 f
是,例如,将接收到的块分成段落的函数。 rest
是块末尾的东西 f
无法确定它是否是整个段落,因此需要更多数据(另一个块)。当所有内容都被阅读后,可以假设 rest
是一整段,然后 _flush
用于推送它,如下所示。抛出上述异常,可能是因为 "<p>"+this.rest+"</p>"
大于 this.rest
。这不是真正预期的行为...
_flush(done) {
if (this.rest !== "") this.push("<p>"+this.rest+"</p>");
this.rest = null;
this.toPush = null;
done()
}
编辑:
所以 Calvin Metcalf 在这里给了我一个工作https://github.com/nodejs/readable-stream/issues/207:在节点 8.0.0 上可以使用 _final 而不是 _flush。
这个问题看起来很不稳定,因为他没有在他的环境中复制。
我正在使用异步操作实现转换流。我的名字叫 Parser
.
var Transform = require('stream').transform;
function Parser(options) {
Transform.call(this, {objectMode: true});
}
Parser.prototype._transform = function _transform(input, encoding, callback) {
var this_ = this;
doSomethingAsync(input, function(output) {
this_.push(output);
//possible location #1 for callback();
});
//possible location #2 for callback();
}
每个传入的块可能需要很长时间才能处理(doSomethingAsync
需要网络请求)。但是,每个块的处理完全独立于之前的块。此外,输出的确切顺序并不重要。每个输出都包含一个标识其输入的描述符,而不是按顺序标识。
因此,我希望 _transform
尽快再次被调用,而不是等到给定的块完全完成处理。所以,看看代码,如果我把 callback()
放在 possible location #1
中,那么 _transform
永远不会被调用,直到每个块都被完全处理。但是如果我把它放在 possible location #2
中,那么我的流在回调之后推送,这会导致这些难看的
Uncaught Error: stream.push() after EOF
流终止后出现错误。
所以我的问题是:是否可以使用转换流来做到这一点?或者我应该考虑使用图书馆吗?如果是,哪种类型(事件流、FRP 等...)?
谢谢。
您可以在您的流上实现 _flush()
,并且仅在所有异步函数完成时调用传递给该函数的回调。像这样:
function Parser(options) {
Transform.call(this, {objectMode: true});
this._pending = 0;
this._flushcb = undefined;
}
Parser.prototype._transform = function _transform(input, encoding, callback) {
var self = this;
++this._pending;
doSomethingAsync(input, function(output) {
self.push(output);
if (--self._pending === 0 && self._flushcb)
self._flushcb();
});
callback();
}
Parser.prototype._flush = function(callback) {
this._flushcb = callback;
};
我认为答案不完整。
假设你有一个像这样的 _transform()
:
_transform(chunk, encoding, done) {
let data = chunk.toString();
this.rest += data;
[this.toPush, this.rest] = this.f(this.rest);
for (let i = 0; i < this.toPush.length; i++) {
if (!this.push(this.toPush[i])) {
this._source.readStop();
break;
}
}
done()
}
```
其中 f
是,例如,将接收到的块分成段落的函数。 rest
是块末尾的东西 f
无法确定它是否是整个段落,因此需要更多数据(另一个块)。当所有内容都被阅读后,可以假设 rest
是一整段,然后 _flush
用于推送它,如下所示。抛出上述异常,可能是因为 "<p>"+this.rest+"</p>"
大于 this.rest
。这不是真正预期的行为...
_flush(done) {
if (this.rest !== "") this.push("<p>"+this.rest+"</p>");
this.rest = null;
this.toPush = null;
done()
}
编辑: 所以 Calvin Metcalf 在这里给了我一个工作https://github.com/nodejs/readable-stream/issues/207:在节点 8.0.0 上可以使用 _final 而不是 _flush。 这个问题看起来很不稳定,因为他没有在他的环境中复制。