Java ThreadPoolExecutor 使用流和 Callable

Java ThreadPoolExexecutor using streams and Callables

我有一个实现 Callable 的 class,它有一个覆盖 call 和 return 的方法 Long.

我创建了 Callable<Long>List 作为

List<Callable<Long>> callables = new ArrayList<>();
for (File fileEntry : folder.listFiles()) {
    callables.add(new DataProcessor(fileEntry));

我有

ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(10);

我打电话给

threadPoolExecutor.invokeAll(callables)
    .stream()
    .map(future -> {
        try {
            return future.get();
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
        })
        .collect(Collectors.toLong(/* what goes here? */));

我想做的是对 future.get() 中的所有 return 值求和。

此外,既然我正在调用 invokeAll,我还需要关闭 Executor 吗?

你需要的是Collectors.summingLong:

.collect(Collectors.summingLong(r -> r));

其中 r -> r 只是一个 ToLongFunction,它从您的 Callable 返回的每个 Long 中生成 long

Also, since I am calling the invokeAll, do I still need to do a shutdown on the Executor?

ExecutorService.invokeAll 没有记录自动关机。所以你需要自己关闭它

您可以将流的 Stream.mapToLong to map the future.get as LongStream and then find the sum 用作:

long sum = threadPoolExecutor.invokeAll(callables)
            .stream()
            .mapToLong(future -> {
                try {
                    return future.get();
                } catch (Exception e) {
                    throw new IllegalStateException(e);
                }
            }) // LongStream
            .sum(); // sum of the stream

注意:这使用Collectors.summingLong简化了流API调用链。它允许在遍历集合时避免创建冗余的临时对象。

旁白:您也可以 collect 您的 Callable 作为 :

List<Callable<Long>> callables = fileList.stream()
                                         .map(fileEntry -> new DataProcessor(fileEntry))
                                         .collect(Collectors.toList());

since I am calling the invokeAll, do I still need to do a shutdown on the Executor?

是的,您必须关闭 ExecutorService。您还可以使用 isShutDown() API 作为 :

确认相同的状态
System.out.println(threadPoolExecutor.isShutdown()); // would return false