节点 - 将管道步骤抽象为函数

Node - Abstracting Pipe Steps into Function

我熟悉 Node 流,但我正在努力寻找将我经常重复使用的代码抽象为单个管道步骤的最佳实践。

这是我今天写的内容的精简版:

inputStream
.pipe(csv.parse({columns:true})
.pipe(csv.transform(function(row) {return transform(row); }))
.pipe(csv.stringify({header: true})
.pipe(outputStream);

实际工作发生在 transform()。唯一真正改变的是 inputStreamtransform()outputStream。就像我说的,这是我实际使用的精简版。我在每个管道步骤上都有很多错误处理和日志记录,这最终是我尝试抽象代码的原因。

我要写的是单个管道步骤,如下所示:

inputStream
.pipe(csvFunction(transform(row)))
.pipe(outputStream);

我很难理解的是如何将这些管道步骤变成一个接受流和 returns 流的函数。我看过像 through2 这样的图书馆,但我不确定它是如何让我到达我想去的地方的。

您可以像这样使用 PassThrough class:

var PassThrough = require('stream').PassThrough;

var csvStream = new PassThrough();
csvStream.on('pipe', function (source) {
  // undo piping of source
  source.unpipe(this);
  // build own pipe-line and store internally
  this.combinedStream =
    source.pipe(csv.parse({columns: true}))
      .pipe(csv.transform(function (row) {
        return transform(row);
      }))
      .pipe(csv.stringify({header: true}));
});

csvStream.pipe = function (dest, options) {
  // pipe internal combined stream to dest
  return this.combinedStream.pipe(dest, options);
};

inputStream
  .pipe(csvStream)
  .pipe(outputStream);

这就是我最终的结果。我使用 through2 library and the streaming API of the csv library 创建了我正在寻找的管道函数。

var csv = require('csv');
    through = require('through2');

module.exports = function(transformFunc) {
    parser = csv.parse({columns:true, relax_column_count:true}),
    transformer = csv.transform(function(row) {
        return transformFunc(row);
    }),
    stringifier = csv.stringify({header: true});

    return through(function(chunk,enc,cb){
        var stream = this;

            parser.on('data', function(data){
                transformer.write(data);
            });

            transformer.on('data', function(data){
                stringifier.write(data);
            });

            stringifier.on('data', function(data){
                stream.push(data);
            });

            parser.write(chunk);

            parser.removeAllListeners('data');
            transformer.removeAllListeners('data');
            stringifier.removeAllListeners('data');
            cb();
    })
}

值得注意的是我在最后删除了事件监听器的部分,这是由于 运行 我创建了太多事件监听器的内存错误。我最初尝试通过使用 once 监听事件来解决这个问题,但这阻止了后续块被读取并传递到下一个管道步骤。

如果有人有反馈或其他想法,请告诉我。