节点读取流 - 如何限制打开文件的数量?

Node Read Streams - How can I limit the number of open files?

我在流式传输多个文件时 运行 进入 AggregateError: EMFILE: too many open files

机器详细信息: MacOS 蒙特雷, MacBook Pro(14 英寸,2021 年款), 芯片苹果 M1 Pro, 内存 16GB, 节点 v16.13.0

我试过增加限制,但没有成功。 理想情况下,我希望能够设置一次打开文件数的限制,或者在文件被使用后立即关闭文件来解决。

代码如下。我试图删除不相关的代码并将其替换为“//...”。

const MultiStream = require('multistream');
const fs = require('fs-extra'); // Also tried graceful-fs and the standard fs
const { fdir } = require("fdir");
// Also have a require for the bz2 and split2 functions but editing from phone right now

//...

let files = [];

//...

(async() => {

  const crawler = await new fdir()
  .filter((path, isDirectory) => path.endsWith(".bz2"))
  .withFullPaths()
  .crawl("Dir/Sub Dir")
  .withPromise();

  for(const file of crawler){
    files = [...files, fs.createReadStream(file)]
  }

  multi = await new MultiStream(files)
    // Unzip
    .pipe(bz2())
    // Create chunks from lines
    .pipe(split2())
    .on('data', function (obj) {
      // Code to filter data and extract what I need
      //...
    })
    .on("error", function(error) {
      // Handling parsing errors
      //...
    })
    .on('end', function(error) {
      // Output results
      //...
    })

})();

为了防止预先打开阵列中每个文件的文件句柄,您希望仅在轮到特定文件进行流式处理时按需打开文件。而且,您可以使用多流来做到这一点。

根据多流 doc,您可以通过更改此延迟创建 readStreams:

  for(const file of crawler){
    files = [...files, fs.createReadStream(file)]
  }

对此:

  let files = crawler.map((f) => {
      return function() {
          return fs.createReadStream(f);
      }
  });

阅读 multistream 的 npm 页面后,我想我找到了一些有用的东西。我还编辑了将流添加到文件数组的位置,因为我认为不需要实例化新数组并像您正在做的那样传播现有元素。

To lazily create the streams, wrap them in a function:

    var streams = [
      fs.createReadStream(__dirname + '/numbers/1.txt'),
      function () { // will be executed when the stream is active
        return fs.createReadStream(__dirname + '/numbers/2.txt')
      },
      function () { // same
        return fs.createReadStream(__dirname + '/numbers/3.txt')
      }
    ]
    
    new MultiStream(streams).pipe(process.stdout) // => 123 ```

有了它,我们可以通过简单地将 readStreams 包装在函数中来更新您的逻辑以包含此功能,这样在需要它们之前不会创建流​​。这将防止您一次打开太多。我们可以通过简单地更新您的文件循环来做到这一点:

for(const file of crawler){
    files.push(function() {
        return fs.createReadStream(file)
    })
}