如何在使用流异步读取文件时同步处理每个 row/buffer

How to process each row/buffer synchronously when reading a file asynchronously using a stream

如您所见,我有一个 js,它采用 .csv 并为每一行调用一个异步函数(迭代 4 个不同的函数)。

问题是我需要等待第 i 次迭代中的函数结束,然后再进行第 i+1 次迭代

const csv = require('csv-parser');
const fs = require('fs');

var i=1;

fs.createReadStream('table.csv')
  .pipe(csv())
  .on('data', (row) => {
      switch(i%4){
          case 1: org1createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
          case 2: org2createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
          case 3: org3createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
          case 0: org4createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
      }
    i++;
  })
  .on('end', () => {
    console.log('CSV file successfully processed');
  });





  async function org1createPatient(patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) {
    ...
  }

  async function org2createPatient( patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) {
    ...
  }

  async function org3createPatient( patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) {
   ...
  }

  async function org4createPatient( patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) {
   ...
  }

我怎样才能得到我想要的? 希望我的问题足够清楚!

您在这里使用的 readStream 是异步的,这意味着每次读取新数据时都会触发 .on(event, callback),独立于任何 callback 触发。也就是说这里callback函数的执行不会影响这个过程,会运行并行,每次event收到

这意味着如果 callback 执行一段异步代码,您很可能会遇到此函数的多个实例仍可能 运行 的情况收到下一次阅读的时间 event

Note: this holds true for any event, including the 'end' event.

如果您在 callback 上使用 async/await if 只会使该函数的内部逻辑同步。它仍然不会影响您的数据读取速度。

为此,您需要在 callback 上同时使用 async/await(以使其内部同步)并让 callback 手动暂停和恢复发生在并行。

const csv = require('csv-parser');
const fs = require('fs');

let i = 1;

const stream = fs.createReadStream('table.csv').pipe(csv());

stream.on('data', async (row) => {
   // pause overall stream until this row is processed
   stream.pause();

   // process row
   switch (i%4){
      case 1: await org1createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
      case 2: await org2createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
      case 3: await org3createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
      case 0: await org4createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
   }
   i++;

   // resume overall stream
   stream.resume();
});

stream.on('end', () => {
  // now guaranteed that no instances of `callback` is still running in parallel when this event is fired
  console.log('CSV file successfully processed');
});

下面的解决方案是使用iter-ops库,在这种情况下非常有效,因为pipe(csv()) return是一个AsyncIterable,所以应该相应地处理它。

既然你不关心那些处理函数是什么return,我们可以throttle对每一行的处理:

const {pipe, throttle, onEnd, catchError} = require('iter-ops');
const csv = require('csv-parser');
const fs = require('fs');

const asyncIterable = fs.createReadStream('table.csv').pipe(csv());

const i = pipe(
    asyncIterable,
    throttle(async (row, index) => {
        switch (index % 4) {
            case 1: await org1createPatient(row.patientId, ...); break;
            case 2: await org2createPatient(row.patientId, ...); break;
            case 3: await org3createPatient(row.patientId, ...); break;
            case 0: await org4createPatient(row.patientId, ...); break;
            default: break;
        }
    }),
    onEnd(s => {
        console.log(`Completed ${s.count} rows, in ${s.duration}ms`);
    }),
    catchError((err, ctx) => {
        console.log(`Failed on row with index ${ctx.index}:`, err);
        throw err; // to stop the iteration
    })
);

async function processCSV() {
    // this will trigger the iteration:
    for await(const a of i) {
        // iterate and process the CSV
    }
}

P.S。我是 iter-ops.

的作者