NodeJS、promises、streams——处理大型 CSV 文件
NodeJS, promises, streams - processing large CSV files
我需要构建一个函数来处理大型 CSV 文件,以便在 bluebird.map() 调用中使用。考虑到文件的潜在大小,我想使用流式处理。
这个函数应该接受一个流(一个 CSV 文件)和一个函数(处理来自流的块)和 return 当文件被读取结束(解决)或错误(拒绝)时的承诺).
所以,我开始:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);
// use readable or data event?
parser.on('readable', function() {
// call processor, which may be async
// how do I throttle the amount of promises generated
});
var db = pgp(api.config.mailroom.fileMakerDbConfig);
return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});
}
现在,我有两个相互关联的问题:
- 我需要限制正在处理的实际数据量,以免造成内存压力。
- 作为
processor
参数传递的函数通常是异步的,例如通过基于 promise 的库将文件内容保存到数据库(现在:pg-promise
).因此,它将在记忆中创造一个承诺并继续前进。
pg-promise
库具有管理此功能的函数,例如 page(),但我无法了解如何将流事件处理程序与这些 promise 方法混合使用。现在,我 return 在每个 read()
之后的 readable
部分的处理程序中承诺,这意味着我创建了大量承诺的数据库操作并最终因为我命中进程内存而出错限制。
有没有人有我可以用作跳转点的工作示例?
更新:给猫剥皮的方法可能不止一种,但这行得通:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
// some checks trimmed out for example
var db = pgp(api.config.mailroom.fileMakerDbConfig);
var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);
var readDataFromStream = function(index, data, delay) {
var records = [];
var record;
do {
record = parser.read();
if(record != null)
records.push(record);
} while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
parser.pause();
if(records.length)
return records;
};
var processData = function(index, data, delay) {
console.log('processData(' + index + ') > data: ', data);
parser.resume();
};
parser.on('readable', function() {
db.task(function(tsk) {
this.page(readDataFromStream, processData);
});
});
return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});
}
有人发现这种方法存在潜在问题吗?
所以说您不想要流式传输,而是想要某种数据块? ;-)
你知道https://github.com/substack/stream-handbook吗?
我认为不改变架构的最简单方法是某种承诺池。例如https://github.com/timdp/es6-promise-pool
在下面找到一个完整的应用程序,它可以正确执行您想要的同类任务:它将文件作为流读取,将其解析为 CSV 并将每一行插入到数据库中。
const fs = require('fs');
const promise = require('bluebird');
const csv = require('csv-parse');
const pgp = require('pg-promise')({promiseLib: promise});
const cn = "postgres://postgres:password@localhost:5432/test_db";
const rs = fs.createReadStream('primes.csv');
const db = pgp(cn);
function receiver(_, data) {
function source(index) {
if (index < data.length) {
// here we insert just the first column value that contains a prime number;
return this.none('insert into primes values()', data[index][0]);
}
}
return this.sequence(source);
}
db.task(t => {
return pgp.spex.stream.read.call(t, rs.pipe(csv()), receiver);
})
.then(data => {
console.log('DATA:', data);
}
.catch(error => {
console.log('ERROR:', error);
});
请注意,我唯一更改的是:使用库 csv-parse
而不是 csv
,作为更好的选择。
添加了使用方法 stream.read from the spex library, which properly serves a Readable 流以与 promises 一起使用。
您可能想看看 promise-streams
var ps = require('promise-streams');
passedStream
.pipe(csv.parse({trim: true}))
.pipe(ps.map({concurrent: 4}, row => processRowDataWhichMightBeAsyncAndReturnPromise(row)))
.wait().then(_ => {
console.log("All done!");
});
适用于背压和一切。
我找到了一种稍微好一点的方法来做同样的事情;有更多的控制。这是一个具有精确并行度控制的最小骨架。将并行值作为一个,所有记录都按顺序处理,而无需将整个文件存储在内存中,我们可以增加并行值以加快处理速度。
const csv = require('csv');
const csvParser = require('csv-parser')
const fs = require('fs');
const readStream = fs.createReadStream('IN');
const writeStream = fs.createWriteStream('OUT');
const transform = csv.transform({ parallel: 1 }, (record, done) => {
asyncTask(...) // return Promise
.then(result => {
// ... do something when success
return done(null, record);
}, (err) => {
// ... do something when error
return done(null, record);
})
}
);
readStream
.pipe(csvParser())
.pipe(transform)
.pipe(csv.stringify())
.pipe(writeStream);
这允许为每条记录执行异步任务。
为了return一个承诺,我们可以return一个空的承诺,并在流结束时完成它。
.on('end',function() {
//do something wiht csvData
console.log(csvData);
});
我需要构建一个函数来处理大型 CSV 文件,以便在 bluebird.map() 调用中使用。考虑到文件的潜在大小,我想使用流式处理。
这个函数应该接受一个流(一个 CSV 文件)和一个函数(处理来自流的块)和 return 当文件被读取结束(解决)或错误(拒绝)时的承诺).
所以,我开始:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);
// use readable or data event?
parser.on('readable', function() {
// call processor, which may be async
// how do I throttle the amount of promises generated
});
var db = pgp(api.config.mailroom.fileMakerDbConfig);
return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});
}
现在,我有两个相互关联的问题:
- 我需要限制正在处理的实际数据量,以免造成内存压力。
- 作为
processor
参数传递的函数通常是异步的,例如通过基于 promise 的库将文件内容保存到数据库(现在:pg-promise
).因此,它将在记忆中创造一个承诺并继续前进。
pg-promise
库具有管理此功能的函数,例如 page(),但我无法了解如何将流事件处理程序与这些 promise 方法混合使用。现在,我 return 在每个 read()
之后的 readable
部分的处理程序中承诺,这意味着我创建了大量承诺的数据库操作并最终因为我命中进程内存而出错限制。
有没有人有我可以用作跳转点的工作示例?
更新:给猫剥皮的方法可能不止一种,但这行得通:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
// some checks trimmed out for example
var db = pgp(api.config.mailroom.fileMakerDbConfig);
var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);
var readDataFromStream = function(index, data, delay) {
var records = [];
var record;
do {
record = parser.read();
if(record != null)
records.push(record);
} while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
parser.pause();
if(records.length)
return records;
};
var processData = function(index, data, delay) {
console.log('processData(' + index + ') > data: ', data);
parser.resume();
};
parser.on('readable', function() {
db.task(function(tsk) {
this.page(readDataFromStream, processData);
});
});
return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});
}
有人发现这种方法存在潜在问题吗?
所以说您不想要流式传输,而是想要某种数据块? ;-)
你知道https://github.com/substack/stream-handbook吗?
我认为不改变架构的最简单方法是某种承诺池。例如https://github.com/timdp/es6-promise-pool
在下面找到一个完整的应用程序,它可以正确执行您想要的同类任务:它将文件作为流读取,将其解析为 CSV 并将每一行插入到数据库中。
const fs = require('fs');
const promise = require('bluebird');
const csv = require('csv-parse');
const pgp = require('pg-promise')({promiseLib: promise});
const cn = "postgres://postgres:password@localhost:5432/test_db";
const rs = fs.createReadStream('primes.csv');
const db = pgp(cn);
function receiver(_, data) {
function source(index) {
if (index < data.length) {
// here we insert just the first column value that contains a prime number;
return this.none('insert into primes values()', data[index][0]);
}
}
return this.sequence(source);
}
db.task(t => {
return pgp.spex.stream.read.call(t, rs.pipe(csv()), receiver);
})
.then(data => {
console.log('DATA:', data);
}
.catch(error => {
console.log('ERROR:', error);
});
请注意,我唯一更改的是:使用库 csv-parse
而不是 csv
,作为更好的选择。
添加了使用方法 stream.read from the spex library, which properly serves a Readable 流以与 promises 一起使用。
您可能想看看 promise-streams
var ps = require('promise-streams');
passedStream
.pipe(csv.parse({trim: true}))
.pipe(ps.map({concurrent: 4}, row => processRowDataWhichMightBeAsyncAndReturnPromise(row)))
.wait().then(_ => {
console.log("All done!");
});
适用于背压和一切。
我找到了一种稍微好一点的方法来做同样的事情;有更多的控制。这是一个具有精确并行度控制的最小骨架。将并行值作为一个,所有记录都按顺序处理,而无需将整个文件存储在内存中,我们可以增加并行值以加快处理速度。
const csv = require('csv');
const csvParser = require('csv-parser')
const fs = require('fs');
const readStream = fs.createReadStream('IN');
const writeStream = fs.createWriteStream('OUT');
const transform = csv.transform({ parallel: 1 }, (record, done) => {
asyncTask(...) // return Promise
.then(result => {
// ... do something when success
return done(null, record);
}, (err) => {
// ... do something when error
return done(null, record);
})
}
);
readStream
.pipe(csvParser())
.pipe(transform)
.pipe(csv.stringify())
.pipe(writeStream);
这允许为每条记录执行异步任务。
为了return一个承诺,我们可以return一个空的承诺,并在流结束时完成它。
.on('end',function() {
//do something wiht csvData
console.log(csvData);
});