使用 Node.js fast-csv 库读写 csv 文件
Read and write to csv file with Node.js fast-csv library
我可能对一般的流缺乏一些深入的了解。但是,我想知道我需要的工作效率如何。
我想实现,以便读取 csv 文件,然后对数据库(或 api)的每一行进行查询并附加数据。之后,带有附加数据的行被写入一个新的 csv 文件。为此,我正在使用 fast-csv 节点库。
这是我的实现:
const fs = require("fs");
const csv = require("fast-csv");
const delay = t => new Promise(resolve => setTimeout(resolve, t));
const asyncFunction = async (row, csvStream) => {
// Imitate some stuff with database
await delay(1200);
row.data = "data";
csvStream.write(row);
};
const array = [];
const csvStream = csv.format({ headers: true });
const writeStream = fs.createWriteStream("output.csv");
csvStream.pipe(writeStream).on("finish", () => {
console.log("End of writing");
});
fs.createReadStream("input.csv")
.pipe(csv.parse({ headers: true }))
.transform(async function(row, next) {
array.push(asyncFunction(row, csvStream));
next();
})
.on("finish", function() {
console.log("finished reading file");
//Wait for all database requests and writings to be finished to close write stream
Promise.all(array).then(() => {
csvStream.end();
console.log("finished writing file");
});
});
我特别想知道有什么方法可以优化我在这里所做的事情,因为我觉得我遗漏了一些关于如何将这个库用于这些类型案例的重要信息
此致,
罗卡斯
我能够在 fast-csv 问题部分找到解决方案。一个好人 doug-martin,提供了这个要点,关于如何通过转换流有效地完成这种操作:
const path = require('path');
const fs = require('fs');
const { Transform } = require('stream');
const csv = require('fast-csv');
class PersistStream extends Transform {
constructor(args) {
super({ objectMode: true, ...(args || {}) });
this.batchSize = 100;
this.batch = [];
if (args && args.batchSize) {
this.batchSize = args.batchSize;
}
}
_transform(record, encoding, callback) {
this.batch.push(record);
if (this.shouldSaveBatch) {
// we have hit our batch size to process the records as a batch
this.processRecords()
// we successfully processed the records so callback
.then(() => callback())
// An error occurred!
.catch(err => err(err));
return;
}
// we shouldnt persist so ignore
callback();
}
_flush(callback) {
if (this.batch.length) {
// handle any leftover records that were not persisted because the batch was too small
this.processRecords()
// we successfully processed the records so callback
.then(() => callback())
// An error occurred!
.catch(err => err(err));
return;
}
// no records to persist so just call callback
callback();
}
pushRecords(records) {
// emit each record for down stream processing
records.forEach(r => this.push(r));
}
get shouldSaveBatch() {
// this could be any check, for this example is is record cont
return this.batch.length >= this.batchSize;
}
async processRecords() {
// save the records
const records = await this.saveBatch();
// besure to emit them
this.pushRecords(records);
return records;
}
async saveBatch() {
const records = this.batch;
this.batch = [];
console.log(`Saving batch [noOfRecords=${records.length}]`);
// This is where you should save/update/delete the records
return new Promise(res => {
setTimeout(() => res(records), 100);
});
}
}
const processCsv = ({ file, batchSize }) =>
new Promise((res, rej) => {
let recordCount = 0;
fs.createReadStream(file)
// catch file read errors
.on('error', err => rej(err))
.pipe(csv.parse({ headers: true }))
// catch an parsing errors
.on('error', err => rej(err))
// pipe into our processing stream
.pipe(new PersistStream({ batchSize }))
.on('error', err => rej(err))
.on('data', () => {
recordCount += 1;
})
.on('end', () => res({ event: 'end', recordCount }));
});
const file = path.resolve(__dirname, `batch_write.csv`);
// end early after 30000 records
processCsv({ file, batchSize: 5 })
.then(({ event, recordCount }) => {
console.log(`Done Processing [event=${event}] [recordCount=${recordCount}]`);
})
.catch(e => {
console.error(e.stack);
});
https://gist.github.com/doug-martin/b434a04f164c81da82165f4adcb144ec
我可能对一般的流缺乏一些深入的了解。但是,我想知道我需要的工作效率如何。
我想实现,以便读取 csv 文件,然后对数据库(或 api)的每一行进行查询并附加数据。之后,带有附加数据的行被写入一个新的 csv 文件。为此,我正在使用 fast-csv 节点库。
这是我的实现:
const fs = require("fs");
const csv = require("fast-csv");
const delay = t => new Promise(resolve => setTimeout(resolve, t));
const asyncFunction = async (row, csvStream) => {
// Imitate some stuff with database
await delay(1200);
row.data = "data";
csvStream.write(row);
};
const array = [];
const csvStream = csv.format({ headers: true });
const writeStream = fs.createWriteStream("output.csv");
csvStream.pipe(writeStream).on("finish", () => {
console.log("End of writing");
});
fs.createReadStream("input.csv")
.pipe(csv.parse({ headers: true }))
.transform(async function(row, next) {
array.push(asyncFunction(row, csvStream));
next();
})
.on("finish", function() {
console.log("finished reading file");
//Wait for all database requests and writings to be finished to close write stream
Promise.all(array).then(() => {
csvStream.end();
console.log("finished writing file");
});
});
我特别想知道有什么方法可以优化我在这里所做的事情,因为我觉得我遗漏了一些关于如何将这个库用于这些类型案例的重要信息
此致, 罗卡斯
我能够在 fast-csv 问题部分找到解决方案。一个好人 doug-martin,提供了这个要点,关于如何通过转换流有效地完成这种操作:
const path = require('path');
const fs = require('fs');
const { Transform } = require('stream');
const csv = require('fast-csv');
class PersistStream extends Transform {
constructor(args) {
super({ objectMode: true, ...(args || {}) });
this.batchSize = 100;
this.batch = [];
if (args && args.batchSize) {
this.batchSize = args.batchSize;
}
}
_transform(record, encoding, callback) {
this.batch.push(record);
if (this.shouldSaveBatch) {
// we have hit our batch size to process the records as a batch
this.processRecords()
// we successfully processed the records so callback
.then(() => callback())
// An error occurred!
.catch(err => err(err));
return;
}
// we shouldnt persist so ignore
callback();
}
_flush(callback) {
if (this.batch.length) {
// handle any leftover records that were not persisted because the batch was too small
this.processRecords()
// we successfully processed the records so callback
.then(() => callback())
// An error occurred!
.catch(err => err(err));
return;
}
// no records to persist so just call callback
callback();
}
pushRecords(records) {
// emit each record for down stream processing
records.forEach(r => this.push(r));
}
get shouldSaveBatch() {
// this could be any check, for this example is is record cont
return this.batch.length >= this.batchSize;
}
async processRecords() {
// save the records
const records = await this.saveBatch();
// besure to emit them
this.pushRecords(records);
return records;
}
async saveBatch() {
const records = this.batch;
this.batch = [];
console.log(`Saving batch [noOfRecords=${records.length}]`);
// This is where you should save/update/delete the records
return new Promise(res => {
setTimeout(() => res(records), 100);
});
}
}
const processCsv = ({ file, batchSize }) =>
new Promise((res, rej) => {
let recordCount = 0;
fs.createReadStream(file)
// catch file read errors
.on('error', err => rej(err))
.pipe(csv.parse({ headers: true }))
// catch an parsing errors
.on('error', err => rej(err))
// pipe into our processing stream
.pipe(new PersistStream({ batchSize }))
.on('error', err => rej(err))
.on('data', () => {
recordCount += 1;
})
.on('end', () => res({ event: 'end', recordCount }));
});
const file = path.resolve(__dirname, `batch_write.csv`);
// end early after 30000 records
processCsv({ file, batchSize: 5 })
.then(({ event, recordCount }) => {
console.log(`Done Processing [event=${event}] [recordCount=${recordCount}]`);
})
.catch(e => {
console.error(e.stack);
});
https://gist.github.com/doug-martin/b434a04f164c81da82165f4adcb144ec