Java 没有使用所有可用的 CPU
Java does not use all available CPUs
我有一个很长的 运行 计算,我需要对一长串输入进行计算。计算是独立的,所以我想把它们分发到几个 CPU 上。我正在使用 Java 8.
代码的框架如下所示:
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
MyService myService = new MyService(executorService);
List<MyResult> results =
myInputList.stream()
.map(myService::getResultFuture)
.map(CompletableFuture::join)
.collect(Collectors.toList());
executorService.shutdown();
负责计算的主要函数如下所示:
CompletableFuture<MyResult> getResultFuture(MyInput input) {
return CompletableFuture.supplyAsync(() -> longCalc(input), executor)))
}
长运行计算是无状态的,不做任何IO。
我希望此代码使用所有可用的 CPU,但它并没有发生。例如,在具有 72 个 CPU 和 numThreads=72
(甚至例如 numThreads=500
)的机器上,cpu 使用率最多为 500-1000%,如 htop 所示:
根据线程转储,许多计算线程正在等待,即:
"pool-1-thread-34" #55 prio=5 os_prio=0 tid=0x00007fe858597890 nid=0xd66 waiting on condition [0x00007fe7f9cdd000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000381815f20> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- None
所有计算线程都在等待同一个锁。在转储时,只有 5 个计算线程是 RUNNABLE,其余的都是 WAITING。
锁定的原因是什么?为什么我无法使用所有 cpu?
您正在提交作业并随后调用 join()
,等待异步作业完成。
Stream 中间步骤按元素执行,这意味着中间步骤 .map(CompletableFuture::join)
一次在一个元素上运行(甚至更糟,因为它是一个顺序流),而不确保所有元素都经过提交步骤。这会导致线程在等待每个计算完成时阻塞。
您必须在开始调用 join()
之前强制提交所有作业:
List<MyResult> results =
myInputList.stream()
.map(myService::getResultFuture)
.collect(Collectors.toList()).stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
如果你可以将你想用 results
列表做的任何事情表达为当一切都完成时调用的动作,你可以以不阻塞线程的方式实现操作 join()
:
List<CompletableFuture<MyResult>> futures = myInputList.stream()
.map(myService::getResultFuture)
.collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(CompletableFuture<?>[]::new))
.thenRun(() -> {
List<MyResult> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
// perform action with results
});
它仍然会调用 join()
来检索结果,但是此时,所有期货都已完成,因此调用者不会被阻塞。
我有一个很长的 运行 计算,我需要对一长串输入进行计算。计算是独立的,所以我想把它们分发到几个 CPU 上。我正在使用 Java 8.
代码的框架如下所示:
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
MyService myService = new MyService(executorService);
List<MyResult> results =
myInputList.stream()
.map(myService::getResultFuture)
.map(CompletableFuture::join)
.collect(Collectors.toList());
executorService.shutdown();
负责计算的主要函数如下所示:
CompletableFuture<MyResult> getResultFuture(MyInput input) {
return CompletableFuture.supplyAsync(() -> longCalc(input), executor)))
}
长运行计算是无状态的,不做任何IO。
我希望此代码使用所有可用的 CPU,但它并没有发生。例如,在具有 72 个 CPU 和 numThreads=72
(甚至例如 numThreads=500
)的机器上,cpu 使用率最多为 500-1000%,如 htop 所示:
根据线程转储,许多计算线程正在等待,即:
"pool-1-thread-34" #55 prio=5 os_prio=0 tid=0x00007fe858597890 nid=0xd66 waiting on condition [0x00007fe7f9cdd000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000381815f20> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- None
所有计算线程都在等待同一个锁。在转储时,只有 5 个计算线程是 RUNNABLE,其余的都是 WAITING。
锁定的原因是什么?为什么我无法使用所有 cpu?
您正在提交作业并随后调用 join()
,等待异步作业完成。
Stream 中间步骤按元素执行,这意味着中间步骤 .map(CompletableFuture::join)
一次在一个元素上运行(甚至更糟,因为它是一个顺序流),而不确保所有元素都经过提交步骤。这会导致线程在等待每个计算完成时阻塞。
您必须在开始调用 join()
之前强制提交所有作业:
List<MyResult> results =
myInputList.stream()
.map(myService::getResultFuture)
.collect(Collectors.toList()).stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
如果你可以将你想用 results
列表做的任何事情表达为当一切都完成时调用的动作,你可以以不阻塞线程的方式实现操作 join()
:
List<CompletableFuture<MyResult>> futures = myInputList.stream()
.map(myService::getResultFuture)
.collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(CompletableFuture<?>[]::new))
.thenRun(() -> {
List<MyResult> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
// perform action with results
});
它仍然会调用 join()
来检索结果,但是此时,所有期货都已完成,因此调用者不会被阻塞。