节点流:拉n条线,变换,继续

Node stream: pull n lines, transform and continue

在我的 NodeJS 中,我想读取一个大文件,按片段(按 n 行)处理它,使用数据,然后处理接下来的 n 行。

我已经尝试使用多个模块(主要模块为 fs、es-stream、node-etl),但我无法做到我想做的。

我做的最好的(代码在下面)在处理新行之前没有等待转换完成。

这是我的片段:

const fs = require('fs');
const es = require('event-stream');
const parse = require('csv-parse');
const stringify = require('csv-stringify');
const etl = require('etl');

exports.toDatabase = (file, done) => {

  // File contains six lines wit htwo values (example : aa:bb for the first line)
  let input = fs.createReadStream(todayTeamsFile);

  input
    .pipe(es.split())
    .pipe(etl.collect(2))
    .pipe(es.map((data, nextMap) => {
      // I'd like to process all this code before continuing to read my stream
      let date = Date.now();
      console.log('map data ' + date);
      console.log(data);

      parse(data[0], {
        delimiter: ';'
      }, (err, output) => {
        console.log('Row done ' + date);
        // Treatment to do would be to insert in database the output
        console.log(output);
        console.log('------ ' + date);
        return nextMap();
      });

    }));

};

但输出显示下一个地图在第一次调用完成之前启动

测试!
地图数据 1481824486765
[ 'aa;zz', 'bb;cc' ]
地图数据 1481824486771
[ 'dd;ee', 'ff;gg' ]
行完成 1481824486765
[ [ 'aa', 'zz' ] ]
------ 1481824486765
行完成 1481824486771
[ [ 'dd', 'ee' ] ]
------ 1481824486771
地图数据 1481824486785
[ 'hh;ii', '' ]
行完成 1481824486785
[ [ 'hh', 'ii' ] ]
------ 1481824486785

不确定你为什么要重新分块流,但是你去吧,

var through = require('through2');
var split = require('split')
var fs = require('fs')
var handler = (function(len) {
  var buff = [];
  var p = 0;
  return through.obj(function(chunk, enc, cb) {
    buff.push(chunk);
    p++;
    if(buff.length===len) {
      console.log('send--------------');
      this.push(buff);
      buff = [];
    }
    if (p>25) {
      this.emit('error', 'kill the stream')
    }
    cb();
  }, function (cb) {
    this.push(buff); // may be much larger than 4, it may need a logic to re split.
    buff = [];
    cb();
  });
})(4);

var origin = fs.createReadStream('tomate.csv');

origin
.pipe(split())
.pipe(handler)
.pipe(through.obj(function(chunk, enc, cb){
  console.log('process: %v', chunk);
  cb()
}));

handler.on('error', function () {
  origin.close();
  // still need o unpipe everything, use mississipi
});