为什么 `parallelStream` 比 `CompletableFuture` 实现更快?
Why is `parallelStream` faster than the `CompletableFuture` implementation?
我想提高我的后端 REST API 在某个操作上的性能,该操作按顺序轮询多个不同的外部 API 并收集它们的响应并将它们全部展平成一个响应列表.
最近刚刚了解到 CompletableFuture
s,我决定试一试,并将该解决方案与只涉及将我的 stream
更改为 parallelStream
的解决方案进行比较。
这是用于基准测试的代码:
package com.foo;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class ConcurrentTest {
static final List<String> REST_APIS =
Arrays.asList("api1", "api2", "api3", "api4", "api5", "api6", "api7", "api8");
MyTestUtil myTest = new MyTestUtil();
long millisBefore; // used to benchmark
@BeforeEach
void setUp() {
millisBefore = System.currentTimeMillis();
}
@AfterEach
void tearDown() {
System.out.printf("time taken : %.4fs\n",
(System.currentTimeMillis() - millisBefore) / 1000d);
}
@Test
void parallelSolution() { // 4s
var parallel = REST_APIS.parallelStream()
.map(api -> myTest.collectOneRestCall())
.flatMap(List::stream)
.collect(Collectors.toList());
System.out.println("List of responses: " + parallel.toString());
}
@Test
void futureSolution() throws Exception { // 8s
var futures = myTest.collectAllResponsesAsync(REST_APIS);
System.out.println("List of responses: " + futures.get()); // only blocks here
}
@Test
void originalProblem() { // 32s
var sequential = REST_APIS.stream()
.map(api -> myTest.collectOneRestCall())
.flatMap(List::stream)
.collect(Collectors.toList());
System.out.println("List of responses: " + sequential.toString());
}
}
class MyTestUtil {
public static final List<String> RESULTS = Arrays.asList("1", "2", "3", "4");
List<String> collectOneRestCall() {
try {
TimeUnit.SECONDS.sleep(4); // simulating the await of the response
} catch (Exception io) {
throw new RuntimeException(io);
} finally {
return MyTestUtil.RESULTS; // always return something, for this demonstration
}
}
CompletableFuture<List<String>> collectAllResponsesAsync(List<String> restApiUrlList) {
/* Collecting the list of all the async requests that build a List<String>. */
List<CompletableFuture<List<String>>> completableFutures = restApiUrlList.stream()
.map(api -> nonBlockingRestCall())
.collect(Collectors.toList());
/* Creating a single Future that contains all the Futures we just created ("flatmap"). */
CompletableFuture<Void> allFutures = CompletableFuture.allOf(completableFutures
.toArray(new CompletableFuture[restApiUrlList.size()]));
/* When all the Futures have completed, we join them to create merged List<String>. */
CompletableFuture<List<String>> allCompletableFutures = allFutures
.thenApply(future -> completableFutures.stream()
.filter(Objects::nonNull) // we filter out the failed calls
.map(CompletableFuture::join)
.flatMap(List::stream) // creating a List<String> from List<List<String>>
.collect(Collectors.toList())
);
return allCompletableFutures;
}
private CompletableFuture<List<String>> nonBlockingRestCall() {
/* Manage the Exceptions here to ensure the wrapping Future returns the other calls. */
return CompletableFuture.supplyAsync(() -> collectOneRestCall())
.exceptionally(ex -> {
return null; // gets managed in the wrapping Future
});
}
}
有 8 个(假)API 的列表。每个响应需要 4 秒来执行,returns 一个包含 4 个实体的列表(字符串,在我们的例子中,为了简单起见)。
结果:
stream
: 32 秒
parallelStream
: 4 秒
CompletableFuture
: 8 秒
我很惊讶并预计最后两个几乎相同。到底是什么造成了这种差异?据我所知,他们都在使用 ForkJoinPool.commonPool()
.
我天真的解释是 parallelStream
,因为它是一个阻塞操作,使用实际的 MainThread
来处理它的工作负载,因此与 CompletableFuture
是异步的,因此不能使用 MainThread
.
CompletableFuture.supplyAsync()
将最终使用 ForkJoinPool
并以 Runtime.getRuntime().availableProcessors() - 1
(JDK 11 source)
的并行性初始化
看起来你有一台 8 处理器的机器。因此池中有 7 个线程。
有 8 个 API 调用,因此公共池中一次只能 运行 7 个。对于可完成的期货测试,将有 8 个任务 运行 您的主线程阻塞,直到它们全部完成。 7 将能够立即执行意味着必须等待 4 秒。
parallelStream()
也使用相同的线程池,但不同之处在于第一个任务将在执行流的终端操作的主线程上执行,剩下 7 个任务分配到公共池。因此,在这种情况下,线程足以 运行 并行处理所有内容。尝试将任务数增加到 9,您将获得 8 秒的 运行 测试时间。
我想提高我的后端 REST API 在某个操作上的性能,该操作按顺序轮询多个不同的外部 API 并收集它们的响应并将它们全部展平成一个响应列表.
最近刚刚了解到 CompletableFuture
s,我决定试一试,并将该解决方案与只涉及将我的 stream
更改为 parallelStream
的解决方案进行比较。
这是用于基准测试的代码:
package com.foo;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class ConcurrentTest {
static final List<String> REST_APIS =
Arrays.asList("api1", "api2", "api3", "api4", "api5", "api6", "api7", "api8");
MyTestUtil myTest = new MyTestUtil();
long millisBefore; // used to benchmark
@BeforeEach
void setUp() {
millisBefore = System.currentTimeMillis();
}
@AfterEach
void tearDown() {
System.out.printf("time taken : %.4fs\n",
(System.currentTimeMillis() - millisBefore) / 1000d);
}
@Test
void parallelSolution() { // 4s
var parallel = REST_APIS.parallelStream()
.map(api -> myTest.collectOneRestCall())
.flatMap(List::stream)
.collect(Collectors.toList());
System.out.println("List of responses: " + parallel.toString());
}
@Test
void futureSolution() throws Exception { // 8s
var futures = myTest.collectAllResponsesAsync(REST_APIS);
System.out.println("List of responses: " + futures.get()); // only blocks here
}
@Test
void originalProblem() { // 32s
var sequential = REST_APIS.stream()
.map(api -> myTest.collectOneRestCall())
.flatMap(List::stream)
.collect(Collectors.toList());
System.out.println("List of responses: " + sequential.toString());
}
}
class MyTestUtil {
public static final List<String> RESULTS = Arrays.asList("1", "2", "3", "4");
List<String> collectOneRestCall() {
try {
TimeUnit.SECONDS.sleep(4); // simulating the await of the response
} catch (Exception io) {
throw new RuntimeException(io);
} finally {
return MyTestUtil.RESULTS; // always return something, for this demonstration
}
}
CompletableFuture<List<String>> collectAllResponsesAsync(List<String> restApiUrlList) {
/* Collecting the list of all the async requests that build a List<String>. */
List<CompletableFuture<List<String>>> completableFutures = restApiUrlList.stream()
.map(api -> nonBlockingRestCall())
.collect(Collectors.toList());
/* Creating a single Future that contains all the Futures we just created ("flatmap"). */
CompletableFuture<Void> allFutures = CompletableFuture.allOf(completableFutures
.toArray(new CompletableFuture[restApiUrlList.size()]));
/* When all the Futures have completed, we join them to create merged List<String>. */
CompletableFuture<List<String>> allCompletableFutures = allFutures
.thenApply(future -> completableFutures.stream()
.filter(Objects::nonNull) // we filter out the failed calls
.map(CompletableFuture::join)
.flatMap(List::stream) // creating a List<String> from List<List<String>>
.collect(Collectors.toList())
);
return allCompletableFutures;
}
private CompletableFuture<List<String>> nonBlockingRestCall() {
/* Manage the Exceptions here to ensure the wrapping Future returns the other calls. */
return CompletableFuture.supplyAsync(() -> collectOneRestCall())
.exceptionally(ex -> {
return null; // gets managed in the wrapping Future
});
}
}
有 8 个(假)API 的列表。每个响应需要 4 秒来执行,returns 一个包含 4 个实体的列表(字符串,在我们的例子中,为了简单起见)。
结果:
stream
: 32 秒parallelStream
: 4 秒CompletableFuture
: 8 秒
我很惊讶并预计最后两个几乎相同。到底是什么造成了这种差异?据我所知,他们都在使用 ForkJoinPool.commonPool()
.
我天真的解释是 parallelStream
,因为它是一个阻塞操作,使用实际的 MainThread
来处理它的工作负载,因此与 CompletableFuture
是异步的,因此不能使用 MainThread
.
CompletableFuture.supplyAsync()
将最终使用 ForkJoinPool
并以 Runtime.getRuntime().availableProcessors() - 1
(JDK 11 source)
看起来你有一台 8 处理器的机器。因此池中有 7 个线程。
有 8 个 API 调用,因此公共池中一次只能 运行 7 个。对于可完成的期货测试,将有 8 个任务 运行 您的主线程阻塞,直到它们全部完成。 7 将能够立即执行意味着必须等待 4 秒。
parallelStream()
也使用相同的线程池,但不同之处在于第一个任务将在执行流的终端操作的主线程上执行,剩下 7 个任务分配到公共池。因此,在这种情况下,线程足以 运行 并行处理所有内容。尝试将任务数增加到 9,您将获得 8 秒的 运行 测试时间。