节点流:拉n条线,变换,继续
Node stream: pull n lines, transform and continue
在我的 NodeJS 中,我想读取一个大文件,按片段(按 n 行)处理它,使用数据,然后处理接下来的 n 行。
我已经尝试使用多个模块(主要模块为 fs、es-stream、node-etl),但我无法做到我想做的。
我做的最好的(代码在下面)在处理新行之前没有等待转换完成。
这是我的片段:
const fs = require('fs');
const es = require('event-stream');
const parse = require('csv-parse');
const stringify = require('csv-stringify');
const etl = require('etl');
exports.toDatabase = (file, done) => {
// File contains six lines wit htwo values (example : aa:bb for the first line)
let input = fs.createReadStream(todayTeamsFile);
input
.pipe(es.split())
.pipe(etl.collect(2))
.pipe(es.map((data, nextMap) => {
// I'd like to process all this code before continuing to read my stream
let date = Date.now();
console.log('map data ' + date);
console.log(data);
parse(data[0], {
delimiter: ';'
}, (err, output) => {
console.log('Row done ' + date);
// Treatment to do would be to insert in database the output
console.log(output);
console.log('------ ' + date);
return nextMap();
});
}));
};
但输出显示下一个地图在第一次调用完成之前启动
测试!
地图数据 1481824486765
[ 'aa;zz', 'bb;cc' ]
地图数据 1481824486771
[ 'dd;ee', 'ff;gg' ]
行完成 1481824486765
[ [ 'aa', 'zz' ] ]
------ 1481824486765
行完成 1481824486771
[ [ 'dd', 'ee' ] ]
------ 1481824486771
地图数据 1481824486785
[ 'hh;ii', '' ]
行完成 1481824486785
[ [ 'hh', 'ii' ] ]
------ 1481824486785
不确定你为什么要重新分块流,但是你去吧,
var through = require('through2');
var split = require('split')
var fs = require('fs')
var handler = (function(len) {
var buff = [];
var p = 0;
return through.obj(function(chunk, enc, cb) {
buff.push(chunk);
p++;
if(buff.length===len) {
console.log('send--------------');
this.push(buff);
buff = [];
}
if (p>25) {
this.emit('error', 'kill the stream')
}
cb();
}, function (cb) {
this.push(buff); // may be much larger than 4, it may need a logic to re split.
buff = [];
cb();
});
})(4);
var origin = fs.createReadStream('tomate.csv');
origin
.pipe(split())
.pipe(handler)
.pipe(through.obj(function(chunk, enc, cb){
console.log('process: %v', chunk);
cb()
}));
handler.on('error', function () {
origin.close();
// still need o unpipe everything, use mississipi
});
在我的 NodeJS 中,我想读取一个大文件,按片段(按 n 行)处理它,使用数据,然后处理接下来的 n 行。
我已经尝试使用多个模块(主要模块为 fs、es-stream、node-etl),但我无法做到我想做的。
我做的最好的(代码在下面)在处理新行之前没有等待转换完成。
这是我的片段:
const fs = require('fs');
const es = require('event-stream');
const parse = require('csv-parse');
const stringify = require('csv-stringify');
const etl = require('etl');
exports.toDatabase = (file, done) => {
// File contains six lines wit htwo values (example : aa:bb for the first line)
let input = fs.createReadStream(todayTeamsFile);
input
.pipe(es.split())
.pipe(etl.collect(2))
.pipe(es.map((data, nextMap) => {
// I'd like to process all this code before continuing to read my stream
let date = Date.now();
console.log('map data ' + date);
console.log(data);
parse(data[0], {
delimiter: ';'
}, (err, output) => {
console.log('Row done ' + date);
// Treatment to do would be to insert in database the output
console.log(output);
console.log('------ ' + date);
return nextMap();
});
}));
};
但输出显示下一个地图在第一次调用完成之前启动
测试! 地图数据 1481824486765 [ 'aa;zz', 'bb;cc' ] 地图数据 1481824486771 [ 'dd;ee', 'ff;gg' ] 行完成 1481824486765 [ [ 'aa', 'zz' ] ] ------ 1481824486765 行完成 1481824486771 [ [ 'dd', 'ee' ] ] ------ 1481824486771 地图数据 1481824486785 [ 'hh;ii', '' ] 行完成 1481824486785 [ [ 'hh', 'ii' ] ] ------ 1481824486785
不确定你为什么要重新分块流,但是你去吧,
var through = require('through2');
var split = require('split')
var fs = require('fs')
var handler = (function(len) {
var buff = [];
var p = 0;
return through.obj(function(chunk, enc, cb) {
buff.push(chunk);
p++;
if(buff.length===len) {
console.log('send--------------');
this.push(buff);
buff = [];
}
if (p>25) {
this.emit('error', 'kill the stream')
}
cb();
}, function (cb) {
this.push(buff); // may be much larger than 4, it may need a logic to re split.
buff = [];
cb();
});
})(4);
var origin = fs.createReadStream('tomate.csv');
origin
.pipe(split())
.pipe(handler)
.pipe(through.obj(function(chunk, enc, cb){
console.log('process: %v', chunk);
cb()
}));
handler.on('error', function () {
origin.close();
// still need o unpipe everything, use mississipi
});