为什么 parallelStream 不使用整个可用的并行性?
Why does parallelStream not use the entire available parallelism?
我创建了一个并行度为 25 的自定义 ForkJoinPool。
customForkJoinPool = new ForkJoinPool(25);
我有一个包含 700 个文件名的列表,我使用这样的代码从 S3 并行下载文件并将它们转换为 Java 个对象:
customForkJoinPool.submit(() -> {
return fileNames
.parallelStream()
.map((fileName) -> {
Logger log = Logger.getLogger("ForkJoinTest");
long startTime = System.currentTimeMillis();
log.info("Starting job at Thread:" + Thread.currentThread().getName());
MyObject obj = readObjectFromS3(fileName);
long endTime = System.currentTimeMillis();
log.info("completed a job with Latency:" + (endTime - startTime));
return obj;
})
.collect(Collectors.toList);
});
});
当我查看日志时,我发现只使用了 5 个线程。并行度为 25,我预计这将使用 25 个线程。下载文件并将其转换为对象的平均延迟约为 200 毫秒。我错过了什么?
可能更好的问题是并行流如何计算在为原始列表创建线程之前将原始列表拆分多少?在这种情况下,看起来它决定将其拆分 5 次并停止。
我认为答案就在这个...来自 ForkJoinPool
javadoc。
"The pool attempts to maintain enough active (or available) threads by dynamically adding, suspending, or resuming internal worker threads, even if some tasks are stalled waiting to join others. However, no such adjustments are guaranteed in the face of blocked I/O or other unmanaged synchronization."
在您的情况下,下载将执行阻塞 I/O 操作。
你为什么要用 ForkJoinPool
做这个?它适用于 CPU 绑定的任务,其子任务太快而无法保证单独调度。您的工作负载受 IO 限制,延迟为 200 毫秒,单独的调度开销可以忽略不计。
使用 Executor
:
import static java.util.stream.Collectors.toList;
import static java.util.concurrent.CompletableFuture.supplyAsync;
ExecutorService threads = Executors.newFixedThreadPool(25);
List<MyObject> result = fileNames.stream()
.map(fn -> supplyAsync(() -> readObjectFromS3(fn), threads))
.collect(toList()).stream()
.map(CompletableFuture::join)
.collect(toList());
我创建了一个并行度为 25 的自定义 ForkJoinPool。
customForkJoinPool = new ForkJoinPool(25);
我有一个包含 700 个文件名的列表,我使用这样的代码从 S3 并行下载文件并将它们转换为 Java 个对象:
customForkJoinPool.submit(() -> {
return fileNames
.parallelStream()
.map((fileName) -> {
Logger log = Logger.getLogger("ForkJoinTest");
long startTime = System.currentTimeMillis();
log.info("Starting job at Thread:" + Thread.currentThread().getName());
MyObject obj = readObjectFromS3(fileName);
long endTime = System.currentTimeMillis();
log.info("completed a job with Latency:" + (endTime - startTime));
return obj;
})
.collect(Collectors.toList);
});
});
当我查看日志时,我发现只使用了 5 个线程。并行度为 25,我预计这将使用 25 个线程。下载文件并将其转换为对象的平均延迟约为 200 毫秒。我错过了什么?
可能更好的问题是并行流如何计算在为原始列表创建线程之前将原始列表拆分多少?在这种情况下,看起来它决定将其拆分 5 次并停止。
我认为答案就在这个...来自 ForkJoinPool
javadoc。
"The pool attempts to maintain enough active (or available) threads by dynamically adding, suspending, or resuming internal worker threads, even if some tasks are stalled waiting to join others. However, no such adjustments are guaranteed in the face of blocked I/O or other unmanaged synchronization."
在您的情况下,下载将执行阻塞 I/O 操作。
你为什么要用 ForkJoinPool
做这个?它适用于 CPU 绑定的任务,其子任务太快而无法保证单独调度。您的工作负载受 IO 限制,延迟为 200 毫秒,单独的调度开销可以忽略不计。
使用 Executor
:
import static java.util.stream.Collectors.toList;
import static java.util.concurrent.CompletableFuture.supplyAsync;
ExecutorService threads = Executors.newFixedThreadPool(25);
List<MyObject> result = fileNames.stream()
.map(fn -> supplyAsync(() -> readObjectFromS3(fn), threads))
.collect(toList()).stream()
.map(CompletableFuture::join)
.collect(toList());