具有大量任务的 ExecutorService
ExecutorService with huge number of tasks
我有一个文件列表和一个分析这些文件的分析器列表。文件数量可以很大 (200,000) 和分析器数量 (1000)。所以操作总数可能非常大(200,000,000)。现在,我需要应用多线程来加快速度。我采用了这种方法:
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
for (File file : listOfFiles) {
for (Analyzer analyzer : listOfAnalyzers){
executor.execute(() -> {
boolean exists = file.exists();
if(exists){
analyzer.analyze(file);
}
});
}
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
但这种方法的问题是它从内存中占用太多,我想有更好的方法来做到这一点。我仍然是 java 和多线程的初学者。
200M 任务将驻留在何处?我希望不在内存中,除非您计划以分布式方式实施您的解决方案。同时,您需要实例化一个 ExecutorService
来 而不是 累积大量队列。与 "caller runs policy" 一起使用(参见 here) when you create the service。如果您尝试在队列已满时将另一个任务放入队列中,您最终将自己执行它,这可能正是您想要的。
OTOH,既然我更认真地看待你的问题,为什么不同时 分析 单个文件呢?然后队列永远不会大于分析器的数量。坦率地说,这就是我要做的,因为我想要一个可读的日志,它在我加载每个文件时以正确的顺序包含一条消息。
很抱歉没有提供更多帮助:
analysts.stream().map(analyst -> executor.submit(() -> analyst.analyze(file))).map(Future::get);
基本上,为单个文件创建一堆未来,然后在继续之前等待 所有 个。
一个想法是采用 fork/join 算法并将项目(文件)分组,以便单独处理它们。
我的建议如下:
- 首先,过滤掉所有不存在的文件——它们不必要地占用资源。
以下伪代码演示了可能对您有所帮助的算法:
public static class CustomRecursiveTask extends RecursiveTask<Integer {
private final Analyzer[] analyzers;
private final int threshold;
private final File[] files;
private final int start;
private final int end;
public CustomRecursiveTask(Analyzer[] analyzers,
final int threshold,
File[] files,
int start,
int end) {
this.analyzers = analyzers;
this.threshold = threshold;
this.files = files;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
final int filesProcessed = end - start;
if (filesProcessed < threshold) {
return processSequentially();
} else {
final int middle = (start + end) / 2;
final int analyzersCount = analyzers.length;
final ForkJoinTask<Integer> left =
new CustomRecursiveTask(analyzers, threshold, files, start, middle);
final ForkJoinTask<Integer> right =
new CustomRecursiveTask(analyzers, threshold, files, middle + 1, end);
left.fork();
right.fork();
return left.join() + right.join();
}
}
private Integer processSequentially() {
for (int i = start; i < end; i++) {
File file = files[i];
for(Analyzer analyzer : analyzers) { analyzer.analyze(file) };
}
return 1;
}
}
用法如下所示:
public static void main(String[] args) {
final Analyzer[] analyzers = new Analyzer[]{};
final File[] files = new File[] {};
final int threshold = files.length / 5;
ForkJoinPool.commonPool().execute(
new CustomRecursiveTask(
analyzers,
threshold,
files,
0,
files.length
)
);
}
请注意,根据约束,您可以操纵任务的构造函数参数,以便算法将根据文件数量进行调整。
您可以根据文件的数量指定不同的 threshold
s。
final int threshold;
if(files.length > 100_000) {
threshold = files.length / 4;
} else {
threshold = files.length / 8;
}
您还可以根据输入数量在 ForkJoinPool
中指定工作线程的数量。
测量、调整、修改,最终你会解决问题。
希望对您有所帮助。
更新:
如果对结果分析不感兴趣,可以将RecursiveTask
替换为RecursiveAction
。伪代码在两者之间添加了自动装箱开销。
我有一个文件列表和一个分析这些文件的分析器列表。文件数量可以很大 (200,000) 和分析器数量 (1000)。所以操作总数可能非常大(200,000,000)。现在,我需要应用多线程来加快速度。我采用了这种方法:
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
for (File file : listOfFiles) {
for (Analyzer analyzer : listOfAnalyzers){
executor.execute(() -> {
boolean exists = file.exists();
if(exists){
analyzer.analyze(file);
}
});
}
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
但这种方法的问题是它从内存中占用太多,我想有更好的方法来做到这一点。我仍然是 java 和多线程的初学者。
200M 任务将驻留在何处?我希望不在内存中,除非您计划以分布式方式实施您的解决方案。同时,您需要实例化一个 ExecutorService
来 而不是 累积大量队列。与 "caller runs policy" 一起使用(参见 here) when you create the service。如果您尝试在队列已满时将另一个任务放入队列中,您最终将自己执行它,这可能正是您想要的。
OTOH,既然我更认真地看待你的问题,为什么不同时 分析 单个文件呢?然后队列永远不会大于分析器的数量。坦率地说,这就是我要做的,因为我想要一个可读的日志,它在我加载每个文件时以正确的顺序包含一条消息。
很抱歉没有提供更多帮助:
analysts.stream().map(analyst -> executor.submit(() -> analyst.analyze(file))).map(Future::get);
基本上,为单个文件创建一堆未来,然后在继续之前等待 所有 个。
一个想法是采用 fork/join 算法并将项目(文件)分组,以便单独处理它们。
我的建议如下:
- 首先,过滤掉所有不存在的文件——它们不必要地占用资源。
以下伪代码演示了可能对您有所帮助的算法:
public static class CustomRecursiveTask extends RecursiveTask<Integer { private final Analyzer[] analyzers; private final int threshold; private final File[] files; private final int start; private final int end; public CustomRecursiveTask(Analyzer[] analyzers, final int threshold, File[] files, int start, int end) { this.analyzers = analyzers; this.threshold = threshold; this.files = files; this.start = start; this.end = end; } @Override protected Integer compute() { final int filesProcessed = end - start; if (filesProcessed < threshold) { return processSequentially(); } else { final int middle = (start + end) / 2; final int analyzersCount = analyzers.length; final ForkJoinTask<Integer> left = new CustomRecursiveTask(analyzers, threshold, files, start, middle); final ForkJoinTask<Integer> right = new CustomRecursiveTask(analyzers, threshold, files, middle + 1, end); left.fork(); right.fork(); return left.join() + right.join(); } } private Integer processSequentially() { for (int i = start; i < end; i++) { File file = files[i]; for(Analyzer analyzer : analyzers) { analyzer.analyze(file) }; } return 1; } }
用法如下所示:
public static void main(String[] args) {
final Analyzer[] analyzers = new Analyzer[]{};
final File[] files = new File[] {};
final int threshold = files.length / 5;
ForkJoinPool.commonPool().execute(
new CustomRecursiveTask(
analyzers,
threshold,
files,
0,
files.length
)
);
}
请注意,根据约束,您可以操纵任务的构造函数参数,以便算法将根据文件数量进行调整。
您可以根据文件的数量指定不同的 threshold
s。
final int threshold;
if(files.length > 100_000) {
threshold = files.length / 4;
} else {
threshold = files.length / 8;
}
您还可以根据输入数量在 ForkJoinPool
中指定工作线程的数量。
测量、调整、修改,最终你会解决问题。
希望对您有所帮助。
更新:
如果对结果分析不感兴趣,可以将RecursiveTask
替换为RecursiveAction
。伪代码在两者之间添加了自动装箱开销。