默认的 ForkJoinPool 执行器需要很长时间
Default ForkJoinPool executor taking long time
我正在使用 CompletableFuture 异步执行从列表源生成的流。
所以我正在测试重载方法,即 CompletableFuture 的“supplyAsync”,其中一种方法只采用单个供应商参数,另一种采用供应商参数和执行器参数。
这是两者的文档:
一个
supplyAsync(Supplier supplier)
Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() with the value obtained by calling the given Supplier.
秒
supplyAsync(Supplier supplier, Executor executor)
Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier.
这是我的测试 class:
public class TestCompleteableAndParallelStream {
public static void main(String[] args) {
List<MyTask> tasks = IntStream.range(0, 10)
.mapToObj(i -> new MyTask(1))
.collect(Collectors.toList());
useCompletableFuture(tasks);
useCompletableFutureWithExecutor(tasks);
}
public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
long start = System.nanoTime();
ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
List<CompletableFuture<Integer>> futures =
tasks.stream()
.map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
.collect(Collectors.toList());
List<Integer> result =
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
System.out.println(result);
executor.shutdown();
}
public static void useCompletableFuture(List<MyTask> tasks) {
long start = System.nanoTime();
List<CompletableFuture<Integer>> futures =
tasks.stream()
.map(t -> CompletableFuture.supplyAsync(() -> t.calculate()))
.collect(Collectors.toList());
List<Integer> result =
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
System.out.println(result);
}
}
class MyTask {
private final int duration;
public MyTask(int duration) {
this.duration = duration;
}
public int calculate() {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(duration * 1000);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
return duration;
}
}
虽然“useCompletableFuture”方法大约需要 4 秒才能完成,但“useCompletableFutureWithExecutor”方法只需 1 秒即可完成。
不,我的问题是,ForkJoinPool.commonPool() 有哪些不同的处理方式可以产生开销?我们不应该总是更喜欢自定义执行程序池而不是 ForkJoinPool 吗?
检查 ForkJoinPool.commonPool()
大小。默认情况下,它会创建一个大小为
的池
Runtime.getRuntime().availableProcessors() - 1
我 运行 你在我的 Intel i7-4800MQ(4 核 + 4 虚拟核)上的例子,在我的例子中公共池的大小是 7
,所以整个计算花费了 ~2000女士:
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-5
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-7
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-1
Processed 10 tasks in 2005 millis
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
在第二种情况下你使用了
Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
所以池有 10 个线程准备执行计算,所以所有任务都在 ~1000 毫秒内 运行:
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-5
pool-1-thread-6
pool-1-thread-7
pool-1-thread-8
pool-1-thread-9
pool-1-thread-10
Processed 10 tasks in 1002 millis
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
ForkJoinPool
和 ExecutorService
之间的区别
Eugene 在他的评论中还提到了一件更重要的事情。 ForkJoinPool
使用工作窃取方法:
A ForkJoinPool differs from other kinds of ExecutorService mainly by virtue of employing work-stealing: all threads in the pool attempt to find and execute tasks submitted to the pool and/or created by other active tasks (eventually blocking waiting for work if none exist). This enables efficient processing when most tasks spawn other subtasks (as do most ForkJoinTasks), as well as when many small tasks are submitted to the pool from external clients. Especially when setting asyncMode to true in constructors, ForkJoinPools may also be appropriate for use with event-style tasks that are never joined.
而使用 .newFixedThreadPool()
创建的 ExecutorService
使用分而治之的方法。
如何确定池大小?
有一个关于什么是最佳线程池大小的问题,您可能会在那里找到有用的信息:
Setting Ideal size of Thread Pool
这个帖子也是一个很好的调查地方:
Custom thread pool in Java 8 parallel stream
进一步检查互联网上的解决方案,我发现我们可以使用以下属性更改 ForkJoinPool 采用的默认池大小:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=16
因此,这个 属性 可以进一步帮助以更有效的方式和更多的并行性来使用 ForkJoinPool。
我正在使用 CompletableFuture 异步执行从列表源生成的流。
所以我正在测试重载方法,即 CompletableFuture 的“supplyAsync”,其中一种方法只采用单个供应商参数,另一种采用供应商参数和执行器参数。 这是两者的文档:
一个
supplyAsync(Supplier supplier)
Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() with the value obtained by calling the given Supplier.
秒
supplyAsync(Supplier supplier, Executor executor)
Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier.
这是我的测试 class:
public class TestCompleteableAndParallelStream {
public static void main(String[] args) {
List<MyTask> tasks = IntStream.range(0, 10)
.mapToObj(i -> new MyTask(1))
.collect(Collectors.toList());
useCompletableFuture(tasks);
useCompletableFutureWithExecutor(tasks);
}
public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
long start = System.nanoTime();
ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
List<CompletableFuture<Integer>> futures =
tasks.stream()
.map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
.collect(Collectors.toList());
List<Integer> result =
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
System.out.println(result);
executor.shutdown();
}
public static void useCompletableFuture(List<MyTask> tasks) {
long start = System.nanoTime();
List<CompletableFuture<Integer>> futures =
tasks.stream()
.map(t -> CompletableFuture.supplyAsync(() -> t.calculate()))
.collect(Collectors.toList());
List<Integer> result =
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
System.out.println(result);
}
}
class MyTask {
private final int duration;
public MyTask(int duration) {
this.duration = duration;
}
public int calculate() {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(duration * 1000);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
return duration;
}
}
虽然“useCompletableFuture”方法大约需要 4 秒才能完成,但“useCompletableFutureWithExecutor”方法只需 1 秒即可完成。
不,我的问题是,ForkJoinPool.commonPool() 有哪些不同的处理方式可以产生开销?我们不应该总是更喜欢自定义执行程序池而不是 ForkJoinPool 吗?
检查 ForkJoinPool.commonPool()
大小。默认情况下,它会创建一个大小为
Runtime.getRuntime().availableProcessors() - 1
我 运行 你在我的 Intel i7-4800MQ(4 核 + 4 虚拟核)上的例子,在我的例子中公共池的大小是 7
,所以整个计算花费了 ~2000女士:
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-5
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-7
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-1
Processed 10 tasks in 2005 millis
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
在第二种情况下你使用了
Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
所以池有 10 个线程准备执行计算,所以所有任务都在 ~1000 毫秒内 运行:
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-5
pool-1-thread-6
pool-1-thread-7
pool-1-thread-8
pool-1-thread-9
pool-1-thread-10
Processed 10 tasks in 1002 millis
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
ForkJoinPool
和 ExecutorService
之间的区别
Eugene 在他的评论中还提到了一件更重要的事情。 ForkJoinPool
使用工作窃取方法:
A ForkJoinPool differs from other kinds of ExecutorService mainly by virtue of employing work-stealing: all threads in the pool attempt to find and execute tasks submitted to the pool and/or created by other active tasks (eventually blocking waiting for work if none exist). This enables efficient processing when most tasks spawn other subtasks (as do most ForkJoinTasks), as well as when many small tasks are submitted to the pool from external clients. Especially when setting asyncMode to true in constructors, ForkJoinPools may also be appropriate for use with event-style tasks that are never joined.
而使用 .newFixedThreadPool()
创建的 ExecutorService
使用分而治之的方法。
如何确定池大小?
有一个关于什么是最佳线程池大小的问题,您可能会在那里找到有用的信息:
Setting Ideal size of Thread Pool
这个帖子也是一个很好的调查地方:
Custom thread pool in Java 8 parallel stream
进一步检查互联网上的解决方案,我发现我们可以使用以下属性更改 ForkJoinPool 采用的默认池大小:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=16
因此,这个 属性 可以进一步帮助以更有效的方式和更多的并行性来使用 ForkJoinPool。