具有大量任务的 ExecutorService

ExecutorService with huge number of tasks

我有一个文件列表和一个分析这些文件的分析器列表。文件数量可以很大 (200,000) 和分析器数量 (1000)。所以操作总数可能非常大(200,000,000)。现在,我需要应用多线程来加快速度。我采用了这种方法:

ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
for (File file : listOfFiles) {
  for (Analyzer analyzer : listOfAnalyzers){
    executor.execute(() -> {
      boolean exists = file.exists();
      if(exists){
        analyzer.analyze(file);
      }
    });
  }
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);

但这种方法的问题是它从内存中占用太多,我想有更好的方法来做到这一点。我仍然是 java 和多线程的初学者。

200M 任务将驻留在何处?我希望不在内存中,除非您计划以分布式方式实施您的解决方案。同时,您需要实例化一个 ExecutorService 而不是 累积大量队列。与 "caller runs policy" 一起使用(参见 here) when you create the service。如果您尝试在队列已满时将另一个任务放入队列中,您最终将自己执行它,这可能正是您想要的。

OTOH,既然我更认真地看待你的问题,为什么不同时 分析 单个文件呢?然后队列永远不会大于分析器的数量。坦率地说,这就是我要做的,因为我想要一个可读的日志,它在我加载每个文件时以正确的顺序包含一条消息。

很抱歉没有提供更多帮助:

analysts.stream().map(analyst -> executor.submit(() -> analyst.analyze(file))).map(Future::get);

基本上,为单个文件创建一堆未来,然后在继续之前等待 所有 个。

一个想法是采用 fork/join 算法并将项目(文件)分组,以便单独处理它们。

我的建议如下:

  1. 首先,过滤掉所有不存在的文件——它们不必要地占用资源。
  2. 以下伪代码演示了可能对您有所帮助的算法:

    public static class CustomRecursiveTask extends RecursiveTask<Integer {
    
    private final Analyzer[] analyzers;
    
    private final int threshold;
    
    private final File[] files;
    
    private final int start;
    
    private final int end;
    
    public CustomRecursiveTask(Analyzer[] analyzers,
                               final int threshold,
                               File[] files,
                               int start,
                               int end) {
        this.analyzers = analyzers;
        this.threshold = threshold;
        this.files = files;
        this.start = start;
        this.end = end;
    }
    
    @Override
    protected Integer compute() {
        final int filesProcessed = end - start;
        if (filesProcessed < threshold) {
            return processSequentially();
        } else {
            final int middle = (start + end) / 2;
            final int analyzersCount = analyzers.length;
    
            final ForkJoinTask<Integer> left =
                    new CustomRecursiveTask(analyzers, threshold, files, start, middle);
            final ForkJoinTask<Integer> right =
                    new CustomRecursiveTask(analyzers, threshold, files, middle + 1, end);
            left.fork();
            right.fork();
    
            return left.join() + right.join();
        }
    }
    
    private Integer processSequentially() {
        for (int i = start; i < end; i++) {
            File file = files[i];   
            for(Analyzer analyzer : analyzers) { analyzer.analyze(file) };
        }
    
        return 1;
    }
    }
    

用法如下所示:

 public static void main(String[] args) {
    final Analyzer[] analyzers = new Analyzer[]{};
    final File[] files = new File[] {};

    final int threshold = files.length / 5;

    ForkJoinPool.commonPool().execute(
            new CustomRecursiveTask(
                    analyzers,
                    threshold,
                    files,
                    0,
                    files.length
            )
    );
}

请注意,根据约束,您可以操纵任务的构造函数参数,以便算法将根据文件数量进行调整。

您可以根据文件的数量指定不同的 thresholds。

final int threshold;
if(files.length > 100_000) {
   threshold = files.length / 4;
} else {
   threshold = files.length / 8;
}

您还可以根据输入数量在 ForkJoinPool 中指定工作线程的数量。

测量、调整、修改,最终你会解决问题。

希望对您有所帮助。

更新:

如果对结果分析不感兴趣,可以将RecursiveTask替换为RecursiveAction。伪代码在两者之间添加了自动装箱开销。