stream.Transform 在完成输出之前接受新的输入

stream.Transform accept new input before completing output

我正在使用异步操作实现转换流。我的名字叫 Parser.

var Transform = require('stream').transform;

function Parser(options) {
  Transform.call(this, {objectMode: true});
}

Parser.prototype._transform = function _transform(input, encoding, callback) {
  var this_ = this;
  doSomethingAsync(input, function(output) {
    this_.push(output);
    //possible location #1 for callback();
  });
  //possible location #2 for callback();
}

每个传入的块可能需要很长时间才能处理(doSomethingAsync 需要网络请求)。但是,每个块的处理完全独立于之前的块。此外,输出的确切顺序并不重要。每个输出都包含一个标识其输入的描述符,而不是按顺序标识。

因此,我希望 _transform 尽快再次被调用,而不是等到给定的块完全完成处理。所以,看看代码,如果我把 callback() 放在 possible location #1 中,那么 _transform 永远不会被调用,直到每个块都被完全处理。但是如果我把它放在 possible location #2 中,那么我的流在回调之后推送,这会导致这些难看的

Uncaught Error: stream.push() after EOF

流终止后出现错误。

所以我的问题是:是否可以使用转换流来做到这一点?或者我应该考虑使用图书馆吗?如果是,哪种类型(事件流、FRP 等...)?

谢谢。

您可以在您的流上实现 _flush(),并且仅在所有异步函数完成时调用传递给该函数的回调。像这样:

function Parser(options) {
  Transform.call(this, {objectMode: true});

  this._pending = 0;
  this._flushcb = undefined;
}

Parser.prototype._transform = function _transform(input, encoding, callback) {
  var self = this;

  ++this._pending;

  doSomethingAsync(input, function(output) {
    self.push(output);
    if (--self._pending === 0 && self._flushcb)
      self._flushcb();
  });

  callback();
}

Parser.prototype._flush = function(callback) {
  this._flushcb = callback;
};

我认为答案不完整。 假设你有一个像这样的 _transform()

_transform(chunk, encoding, done) {
    let data = chunk.toString();
    this.rest += data;
    [this.toPush, this.rest] = this.f(this.rest);
    for (let i = 0; i < this.toPush.length; i++) {
        if (!this.push(this.toPush[i])) {
            this._source.readStop();
            break;
        } 
    }
    done()
}

```

其中 f 是,例如,将接收到的块分成段落的函数。 rest 是块末尾的东西 f 无法确定它是否是整个段落,因此需要更多数据(另一个块)。当所有内容都被阅读后,可以假设 rest 是一整段,然后 _flush 用于推送它,如下所示。抛出上述异常,可能是因为 "<p>"+this.rest+"</p>" 大于 this.rest。这不是真正预期的行为...

 _flush(done) {
    if (this.rest !== "") this.push("<p>"+this.rest+"</p>");
    this.rest = null;
    this.toPush = null;
    done()

 }

编辑: 所以 Calvin Metcalf 在这里给了我一个工作https://github.com/nodejs/readable-stream/issues/207:在节点 8.0.0 上可以使用 _final 而不是 _flush。 这个问题看起来很不稳定,因为他没有在他的环境中复制。