CompletableFuture 重用池中的线程
CompletableFuture reuse thread from pool
我正在测试 Completable Future。
如上所述 here
我以为线程会从公共池中重用,但这个片段显示了奇怪的行为
for (int i = 0; i < 10000; i++) {
final int counter = i;
CompletableFuture.supplyAsync(() -> {
System.out.println("Looking up " + counter + " on thread " + Thread.currentThread().getName());
return null;
});
}
我有这样的输出:
Looking up 0 on thread Thread-2
Looking up 1 on thread Thread-3
Looking up 2 on thread Thread-4
...
Looking up 10000 on thread Thread-10002
看起来每个任务都创建了一个新线程。
为什么我所有的 completableFuture 都不重用公共池中的线程?
MoreOver 我已经用 RxJava 进行了测试,它可以像预期的那样用这段代码工作:
for (int i = 0; i < 10000; i++) {
rxJobExecute(i).subscribeOn(Schedulers.io()).subscribe();
}
private Observable<String> rxJobExecute(int i) {
return Observable.fromCallable(() -> {
System.out.println("emission " + i + " on thread " + Thread.currentThread().getName());
return "tata";
});
}
产出
emission 8212 on thread RxIoScheduler-120
emission 8214 on thread RxIoScheduler-120
emission 8216 on thread RxIoScheduler-120
emission 8218 on thread RxIoScheduler-120
emission 8220 on thread RxIoScheduler-120
emission 7983 on thread RxIoScheduler-275
emission 1954 on thread RxIoScheduler-261
emission 1833 on thread RxIoScheduler-449
emission 1890 on thread RxIoScheduler-227
可能是因为您只有 2 个处理器,所以应用程序启动时的值 Runtime.getRuntime().availableProcessors()
仅观察到 1
处理器(avialableProcessors 将 return 1 和您机器上的处理器数量,并不是很确定。
如果并行度为 1,ForkJoin 公共池将使用每个任务线程池。
要强制系统以特定并行度加载(至少为此)定义系统 属性 -Djava.util.concurrent.ForkJoinPool.common.parallelism=2
作为运行时参数
编辑:
我又看了一遍内部逻辑。由于您有 2
个核心,因此并行性将始终使用每个任务的线程。逻辑期望大于或等于 3,因此您需要将并行度更新为 3
-Djava.util.concurrent.ForkJoinPool.common.parallelism=3
.
另一种方法是定义自己的线程池
我正在测试 Completable Future。 如上所述 here 我以为线程会从公共池中重用,但这个片段显示了奇怪的行为
for (int i = 0; i < 10000; i++) {
final int counter = i;
CompletableFuture.supplyAsync(() -> {
System.out.println("Looking up " + counter + " on thread " + Thread.currentThread().getName());
return null;
});
}
我有这样的输出:
Looking up 0 on thread Thread-2
Looking up 1 on thread Thread-3
Looking up 2 on thread Thread-4
...
Looking up 10000 on thread Thread-10002
看起来每个任务都创建了一个新线程。 为什么我所有的 completableFuture 都不重用公共池中的线程?
MoreOver 我已经用 RxJava 进行了测试,它可以像预期的那样用这段代码工作:
for (int i = 0; i < 10000; i++) {
rxJobExecute(i).subscribeOn(Schedulers.io()).subscribe();
}
private Observable<String> rxJobExecute(int i) {
return Observable.fromCallable(() -> {
System.out.println("emission " + i + " on thread " + Thread.currentThread().getName());
return "tata";
});
}
产出
emission 8212 on thread RxIoScheduler-120
emission 8214 on thread RxIoScheduler-120
emission 8216 on thread RxIoScheduler-120
emission 8218 on thread RxIoScheduler-120
emission 8220 on thread RxIoScheduler-120
emission 7983 on thread RxIoScheduler-275
emission 1954 on thread RxIoScheduler-261
emission 1833 on thread RxIoScheduler-449
emission 1890 on thread RxIoScheduler-227
可能是因为您只有 2 个处理器,所以应用程序启动时的值 Runtime.getRuntime().availableProcessors()
仅观察到 1
处理器(avialableProcessors 将 return 1 和您机器上的处理器数量,并不是很确定。
如果并行度为 1,ForkJoin 公共池将使用每个任务线程池。
要强制系统以特定并行度加载(至少为此)定义系统 属性 -Djava.util.concurrent.ForkJoinPool.common.parallelism=2
作为运行时参数
编辑:
我又看了一遍内部逻辑。由于您有 2
个核心,因此并行性将始终使用每个任务的线程。逻辑期望大于或等于 3,因此您需要将并行度更新为 3
-Djava.util.concurrent.ForkJoinPool.common.parallelism=3
.
另一种方法是定义自己的线程池