如何在使用流异步读取文件时同步处理每个 row/buffer
How to process each row/buffer synchronously when reading a file asynchronously using a stream
如您所见,我有一个 js,它采用 .csv 并为每一行调用一个异步函数(迭代 4 个不同的函数)。
问题是我需要等待第 i 次迭代中的函数结束,然后再进行第 i+1 次迭代。
const csv = require('csv-parser');
const fs = require('fs');
var i=1;
fs.createReadStream('table.csv')
.pipe(csv())
.on('data', (row) => {
switch(i%4){
case 1: org1createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
case 2: org2createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
case 3: org3createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
case 0: org4createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
}
i++;
})
.on('end', () => {
console.log('CSV file successfully processed');
});
async function org1createPatient(patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) {
...
}
async function org2createPatient( patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) {
...
}
async function org3createPatient( patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) {
...
}
async function org4createPatient( patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) {
...
}
我怎样才能得到我想要的?
希望我的问题足够清楚!
您在这里使用的 readStream
是异步的,这意味着每次读取新数据时都会触发 .on(event, callback)
,独立于任何 callback
触发。也就是说这里callback
函数的执行不会影响这个过程,会运行并行,每次event
收到
这意味着如果 callback
执行一段异步代码,您很可能会遇到此函数的多个实例仍可能 运行 的情况收到下一次阅读的时间 event
。
Note: this holds true for any event, including the 'end'
event.
如果您在 callback
上使用 async/await
if 只会使该函数的内部逻辑同步。它仍然不会影响您的数据读取速度。
为此,您需要在 callback
上同时使用 async/await
(以使其内部同步)并让 callback
手动暂停和恢复发生在并行。
const csv = require('csv-parser');
const fs = require('fs');
let i = 1;
const stream = fs.createReadStream('table.csv').pipe(csv());
stream.on('data', async (row) => {
// pause overall stream until this row is processed
stream.pause();
// process row
switch (i%4){
case 1: await org1createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
case 2: await org2createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
case 3: await org3createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
case 0: await org4createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
}
i++;
// resume overall stream
stream.resume();
});
stream.on('end', () => {
// now guaranteed that no instances of `callback` is still running in parallel when this event is fired
console.log('CSV file successfully processed');
});
下面的解决方案是使用iter-ops库,在这种情况下非常有效,因为pipe(csv())
return是一个AsyncIterable
,所以应该相应地处理它。
既然你不关心那些处理函数是什么return,我们可以throttle对每一行的处理:
const {pipe, throttle, onEnd, catchError} = require('iter-ops');
const csv = require('csv-parser');
const fs = require('fs');
const asyncIterable = fs.createReadStream('table.csv').pipe(csv());
const i = pipe(
asyncIterable,
throttle(async (row, index) => {
switch (index % 4) {
case 1: await org1createPatient(row.patientId, ...); break;
case 2: await org2createPatient(row.patientId, ...); break;
case 3: await org3createPatient(row.patientId, ...); break;
case 0: await org4createPatient(row.patientId, ...); break;
default: break;
}
}),
onEnd(s => {
console.log(`Completed ${s.count} rows, in ${s.duration}ms`);
}),
catchError((err, ctx) => {
console.log(`Failed on row with index ${ctx.index}:`, err);
throw err; // to stop the iteration
})
);
async function processCSV() {
// this will trigger the iteration:
for await(const a of i) {
// iterate and process the CSV
}
}
P.S。我是 iter-ops.
的作者
如您所见,我有一个 js,它采用 .csv 并为每一行调用一个异步函数(迭代 4 个不同的函数)。
问题是我需要等待第 i 次迭代中的函数结束,然后再进行第 i+1 次迭代。
const csv = require('csv-parser');
const fs = require('fs');
var i=1;
fs.createReadStream('table.csv')
.pipe(csv())
.on('data', (row) => {
switch(i%4){
case 1: org1createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
case 2: org2createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
case 3: org3createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
case 0: org4createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
}
i++;
})
.on('end', () => {
console.log('CSV file successfully processed');
});
async function org1createPatient(patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) {
...
}
async function org2createPatient( patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) {
...
}
async function org3createPatient( patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) {
...
}
async function org4createPatient( patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) {
...
}
我怎样才能得到我想要的? 希望我的问题足够清楚!
您在这里使用的 readStream
是异步的,这意味着每次读取新数据时都会触发 .on(event, callback)
,独立于任何 callback
触发。也就是说这里callback
函数的执行不会影响这个过程,会运行并行,每次event
收到
这意味着如果 callback
执行一段异步代码,您很可能会遇到此函数的多个实例仍可能 运行 的情况收到下一次阅读的时间 event
。
Note: this holds true for any event, including the
'end'
event.
如果您在 callback
上使用 async/await
if 只会使该函数的内部逻辑同步。它仍然不会影响您的数据读取速度。
为此,您需要在 callback
上同时使用 async/await
(以使其内部同步)并让 callback
手动暂停和恢复发生在并行。
const csv = require('csv-parser');
const fs = require('fs');
let i = 1;
const stream = fs.createReadStream('table.csv').pipe(csv());
stream.on('data', async (row) => {
// pause overall stream until this row is processed
stream.pause();
// process row
switch (i%4){
case 1: await org1createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
case 2: await org2createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
case 3: await org3createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
case 0: await org4createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
}
i++;
// resume overall stream
stream.resume();
});
stream.on('end', () => {
// now guaranteed that no instances of `callback` is still running in parallel when this event is fired
console.log('CSV file successfully processed');
});
下面的解决方案是使用iter-ops库,在这种情况下非常有效,因为pipe(csv())
return是一个AsyncIterable
,所以应该相应地处理它。
既然你不关心那些处理函数是什么return,我们可以throttle对每一行的处理:
const {pipe, throttle, onEnd, catchError} = require('iter-ops');
const csv = require('csv-parser');
const fs = require('fs');
const asyncIterable = fs.createReadStream('table.csv').pipe(csv());
const i = pipe(
asyncIterable,
throttle(async (row, index) => {
switch (index % 4) {
case 1: await org1createPatient(row.patientId, ...); break;
case 2: await org2createPatient(row.patientId, ...); break;
case 3: await org3createPatient(row.patientId, ...); break;
case 0: await org4createPatient(row.patientId, ...); break;
default: break;
}
}),
onEnd(s => {
console.log(`Completed ${s.count} rows, in ${s.duration}ms`);
}),
catchError((err, ctx) => {
console.log(`Failed on row with index ${ctx.index}:`, err);
throw err; // to stop the iteration
})
);
async function processCSV() {
// this will trigger the iteration:
for await(const a of i) {
// iterate and process the CSV
}
}
P.S。我是 iter-ops.
的作者