PapaParse 和高地
PapaParse and Highland
我必须在 NodeJS 中解析一个非常大的 CSV 文件并将其保存在一次最多允许 500 个条目的数据库中(异步操作)。由于内存限制,我必须流式传输 CSV 文件并希望使用 PapaParse 来解析 CSV 文件(在我的情况下效果最好)。
由于 PapaParse 使用回调样式方法来解析 Node.js 流,我没有看到可以轻松组合 highland(用于批处理和数据转换)和 PapaParse。因此,我尝试使用 ParseThrough 流写入数据并使用 highland 读取该流以进行批处理:
const csv = require('papaparse');
const fs = require('fs');
const highland = require('highland');
const { PassThrough } = require('stream');
const passThroughStream = new PassThrough({ objectMode: true });
csv.parse(fileStream, {
step: function(row) {
// Write data to stream
passThroughStream.write(row.data[0]);
},
complete: function() {
// Somehow "end" the stream
passThroughStream.write(null);
},
});
highland(passThroughStream)
.map((data) => {
// data transform
})
.batch(500)
.map((data) => {
// Save up to 500 entries in database (async call)
});
显然,这并不能正常工作,也没有做任何事情。是否有可能甚至是更好的方法来解析非常大的 CSV 文件并将行保存在数据库中(最多 500 个批次)?
编辑:使用 csv
包 (https://www.npmjs.com/package/csv) 可能会像这样(fast-csv
也是如此):
highland(fileStream.pipe(csv.parse()))
.map((data) => {
// data transform
})
.batch(500)
.map((data) => {
// Save up to 500 entries in database (async call)
});
但不幸的是,两个 NPM 包在所有情况下都无法正确解析 CSV 文件。
快速浏览 papaparse
后,我决定在 scramjet
中实施 CSV 解析器。
fileStream.pipe(new scramjet.StringStream('utf-8'))
.csvParse(options)
.batch(500)
.map(items => db.insertArray('some_table', items))
希望对您有用。 :)
我必须在 NodeJS 中解析一个非常大的 CSV 文件并将其保存在一次最多允许 500 个条目的数据库中(异步操作)。由于内存限制,我必须流式传输 CSV 文件并希望使用 PapaParse 来解析 CSV 文件(在我的情况下效果最好)。
由于 PapaParse 使用回调样式方法来解析 Node.js 流,我没有看到可以轻松组合 highland(用于批处理和数据转换)和 PapaParse。因此,我尝试使用 ParseThrough 流写入数据并使用 highland 读取该流以进行批处理:
const csv = require('papaparse');
const fs = require('fs');
const highland = require('highland');
const { PassThrough } = require('stream');
const passThroughStream = new PassThrough({ objectMode: true });
csv.parse(fileStream, {
step: function(row) {
// Write data to stream
passThroughStream.write(row.data[0]);
},
complete: function() {
// Somehow "end" the stream
passThroughStream.write(null);
},
});
highland(passThroughStream)
.map((data) => {
// data transform
})
.batch(500)
.map((data) => {
// Save up to 500 entries in database (async call)
});
显然,这并不能正常工作,也没有做任何事情。是否有可能甚至是更好的方法来解析非常大的 CSV 文件并将行保存在数据库中(最多 500 个批次)?
编辑:使用 csv
包 (https://www.npmjs.com/package/csv) 可能会像这样(fast-csv
也是如此):
highland(fileStream.pipe(csv.parse()))
.map((data) => {
// data transform
})
.batch(500)
.map((data) => {
// Save up to 500 entries in database (async call)
});
但不幸的是,两个 NPM 包在所有情况下都无法正确解析 CSV 文件。
快速浏览 papaparse
后,我决定在 scramjet
中实施 CSV 解析器。
fileStream.pipe(new scramjet.StringStream('utf-8'))
.csvParse(options)
.batch(500)
.map(items => db.insertArray('some_table', items))
希望对您有用。 :)