CompletableFuture 不能像接受某些 java 8 流代码那样工作
CompletableFuture not working as accepted with certain java 8 stream code
我关注
下面的代码片段工作得很好。使用所有线程
片段 1
scrolledPage.stream()
.filter(this::isUserDoesntHaveId)
.map(this::processSingle) // processSingle method return supplier
.map(task -> CompletableFuture.supplyAsync(task, executorService)) // map to CompletableFuture
.collect(Collectors.toList()) // Collect those as list
.stream() // then again making another stream out of that.
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList());
我对这段代码感到困惑
.map(task -> CompletableFuture.supplyAsync(task, executorService)) // map to CompletableFuture
.collect(Collectors.toList()) // Collect those as list
.stream() // then again making another stream out of that.
.map(CompletableFuture::join)
为什么我需要收集然后再次制作流。我已经用下面的方法进行了测试。
片段 2
scrolledPage.stream()
.filter(this::isUserDoesntHaveId)
.map(this::processSingle) // processSingle method return supplier
.map(task -> CompletableFuture.supplyAsync(task, executorService)) // map to CompletableFuture
.map(CompletableFuture::join) // this code is similar without the collecting part.
.filter(Objects::nonNull)
.collect(Collectors.toList());
根据我看到的代码,最后一个代码片段只使用了线程池中的一个线程。但是第一个使用每个线程。这两个代码片段之间有什么区别。
第一个片段可以分为两部分。
在第一部分中,您使用 CompletableFuture
将所有任务提交给异步进程,因为它立即 returns Future
对象,流将处理并收集列表中的所有未来。
List<Future> futures = scrolledPage.stream()
...
.map(task -> CompletableFuture.supplyAsync(task, executorService))
.collect(Collectors.toList())
稍后部分,流处理将在您使用 join
时等待结果,但到那时所有线程都将开始工作。因此它能够利用所有线程。
futures.stream()
.map(CompletableFuture::join)
...
.collect(Collectors.toList());
Streams 进行延迟评估意味着它只会在需要时处理元素。 (要求由终端操作触发,在本例中为collect
)。
在第二个片段中,map(CompletableFuture::join)
让线程在处理流中的下一个元素之前等待结果
.stream()
...
.map(task -> CompletableFuture.supplyAsync(task, executorService))
.map(CompletableFuture::join)
...
.collect(Collectors.toList())
因此,您在流中的任务(下一个元素)只有在第一个任务完成时才会被处理。这将使您的任务按顺序执行。
我关注
下面的代码片段工作得很好。使用所有线程
片段 1
scrolledPage.stream()
.filter(this::isUserDoesntHaveId)
.map(this::processSingle) // processSingle method return supplier
.map(task -> CompletableFuture.supplyAsync(task, executorService)) // map to CompletableFuture
.collect(Collectors.toList()) // Collect those as list
.stream() // then again making another stream out of that.
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList());
我对这段代码感到困惑
.map(task -> CompletableFuture.supplyAsync(task, executorService)) // map to CompletableFuture
.collect(Collectors.toList()) // Collect those as list
.stream() // then again making another stream out of that.
.map(CompletableFuture::join)
为什么我需要收集然后再次制作流。我已经用下面的方法进行了测试。
片段 2
scrolledPage.stream()
.filter(this::isUserDoesntHaveId)
.map(this::processSingle) // processSingle method return supplier
.map(task -> CompletableFuture.supplyAsync(task, executorService)) // map to CompletableFuture
.map(CompletableFuture::join) // this code is similar without the collecting part.
.filter(Objects::nonNull)
.collect(Collectors.toList());
根据我看到的代码,最后一个代码片段只使用了线程池中的一个线程。但是第一个使用每个线程。这两个代码片段之间有什么区别。
第一个片段可以分为两部分。
在第一部分中,您使用 CompletableFuture
将所有任务提交给异步进程,因为它立即 returns Future
对象,流将处理并收集列表中的所有未来。
List<Future> futures = scrolledPage.stream()
...
.map(task -> CompletableFuture.supplyAsync(task, executorService))
.collect(Collectors.toList())
稍后部分,流处理将在您使用 join
时等待结果,但到那时所有线程都将开始工作。因此它能够利用所有线程。
futures.stream()
.map(CompletableFuture::join)
...
.collect(Collectors.toList());
Streams 进行延迟评估意味着它只会在需要时处理元素。 (要求由终端操作触发,在本例中为collect
)。
在第二个片段中,map(CompletableFuture::join)
让线程在处理流中的下一个元素之前等待结果
.stream()
...
.map(task -> CompletableFuture.supplyAsync(task, executorService))
.map(CompletableFuture::join)
...
.collect(Collectors.toList())
因此,您在流中的任务(下一个元素)只有在第一个任务完成时才会被处理。这将使您的任务按顺序执行。