csv-parser 同步发出的事件

Synchronous Emitted Events With csv-parser

我正在尝试使用 npm 包 csv-parser 来解析我的 csv 文件,但 运行 遇到事件发生顺序的问题。

事件按此顺序发出

  1. 'headers': 想要将关于 csv 的元数据插入数据库和 return 一个 id 值
  2. 'data':想要对所有数据事件
  3. 使用来自headers事件的returned id值
  4. 'data'
  5. 'data'
  6. ...
  7. 结束

显然节点的异步性质意味着我在 'headers' 中的缓慢数据库访问在第一个 'data' 事件发出时还没有 returned,因此我不还没有 csv 的 ID。我能想到的唯一选择是将所有数据行缓存到某个临时变量中,然后在读取整个 csv 后推送所有内容。考虑到我可能有非常大的 csv 文件,这似乎是个坏主意?关于解决此问题的更好方法有什么建议吗?

编辑:添加了一些代码(伪代码,未实际测试)

let headerList = null;
let dataArray = [];
fs.createReadStream(path)
    .pipe(csv())
    // Parse the headers into a comma delimminated string
    .on('headers', function(headers) {
        // some parsing logic and then assigned to variable
        headerList = headers;
    })
    .on('data', function (data) {
        // Some push of data into a variable
        dataArray.push(data);
    })
    .on('end', function() {
        // create the base upload object
        const id = uploads.createUpload(filename, headerList, new Date());

        // insert data
        uploads.insertUploadData(id, dataArray);
    })
  1. 当您获得 headers 事件时,unpipe() 读取流。这会将文件 reader 置于暂停状态,因此您不必在内存中缓冲一堆东西。

  2. 因为数据是以块(通常为 64 kB)的形式从磁盘读取的,CSV 解析器在继续解析当前块时仍会发出 data 事件。您仍然需要在数组中缓冲少量行。

  3. 当您从数据库中获得所需的所有信息时:

    1. 将缓冲的行提交到数据库。

    2. 删除原始的 data 事件处理程序(排队到数组的事件处理程序)并附加一个直接向数据库提交行的事件处理程序。

    3. pipe() 返回 CSV 解析器的读取流。


您可能还想考虑如果您的程序从磁盘读取和解析 CSV 的速度快于数据库接受数据的速度,会发生什么情况。由于没有背压,大量的数据库操作可能最终会在内存中排队,直到您 运行 退出。

如果有许多待处理的数据库操作,您应该暂停文件读取流。