CompletableFuture 不能像接受某些 java 8 流代码那样工作

CompletableFuture not working as accepted with certain java 8 stream code

我关注

下面的代码片段工作得很好。使用所有线程

片段 1

scrolledPage.stream()
    .filter(this::isUserDoesntHaveId)
    .map(this::processSingle)                                          // processSingle method return supplier
    .map(task -> CompletableFuture.supplyAsync(task, executorService)) // map to CompletableFuture
    .collect(Collectors.toList())                                      // Collect those as list
    .stream()                                                          // then again making another stream out of that.
    .map(CompletableFuture::join)
    .filter(Objects::nonNull)
    .collect(Collectors.toList());

我对这段代码感到困惑

    .map(task -> CompletableFuture.supplyAsync(task, executorService)) // map to CompletableFuture
    .collect(Collectors.toList())                                      // Collect those as list
    .stream()                                                          // then again making another stream out of that.
    .map(CompletableFuture::join)

为什么我需要收集然后再次制作流。我已经用下面的方法进行了测试。

片段 2

scrolledPage.stream()
    .filter(this::isUserDoesntHaveId)
    .map(this::processSingle)                                           // processSingle method return supplier
    .map(task -> CompletableFuture.supplyAsync(task, executorService)) // map to CompletableFuture
    .map(CompletableFuture::join)               // this code is similar without the collecting part.
    .filter(Objects::nonNull)
    .collect(Collectors.toList());

根据我看到的代码,最后一个代码片段只使用了线程池中的一个线程。但是第一个使用每个线程。这两个代码片段之间有什么区别。

第一个片段可以分为两部分。 在第一部分中,您使用 CompletableFuture 将所有任务提交给异步进程,因为它立即 returns Future 对象,流将处理并收集列表中的所有未来。

List<Future> futures = scrolledPage.stream()
    ...
    .map(task -> CompletableFuture.supplyAsync(task, executorService))
    .collect(Collectors.toList())

稍后部分,流处理将在您使用 join 时等待结果,但到那时所有线程都将开始工作。因此它能够利用所有线程。

    futures.stream()
       .map(CompletableFuture::join)
       ...
       .collect(Collectors.toList());

Streams 进行延迟评估意味着它只会在需要时处理元素。 (要求由终端操作触发,在本例中为collect)。

在第二个片段中,map(CompletableFuture::join) 让线程在处理流中的下一个元素之前等待结果

.stream()
...
.map(task -> CompletableFuture.supplyAsync(task, executorService))
.map(CompletableFuture::join)
...
.collect(Collectors.toList())

因此,您在流中的任务(下一个元素)只有在第一个任务完成时才会被处理。这将使您的任务按顺序执行。