为什么 Java 17 在将任务添加到 ForkJoinPool 时抛出 RejectedExecutionException?
Why does Java 17 throw a RejectedExecutionException when adding tasks to a ForkJoinPool?
我使用 Java 16 通过 HTTP 向 API 发出请求。为了加快整体速度,我将其加载到自定义 ForkJoinPool
上。我在下面编译了一个重现示例。
自从移动到 Java 17(openjdk 构建 17.0.1+12-39)后,这将引发 RejectedExecutionException:
Caused by: java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker
at java.base/java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:1819)
at java.base/java.util.concurrent.ForkJoinPool.compensatedBlock(ForkJoinPool.java:3446)
at java.base/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3432)
at java.base/java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1898)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2072)
at java.net.http/jdk.internal.net.http.HttpClientImpl.send(HttpClientImpl.java:553)
at java.net.http/jdk.internal.net.http.HttpClientFacade.send(HttpClientFacade.java:119)
at Test.lambda$retrieveMany(Test.java:30)
为什么会这样? ForkJoinPool 是否发生了我不知道的变化?
代码
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.stream.Collectors.toList;
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
final List<String> urls = List.of("https://whosebug.com", "https://whosebug.com", "https://whosebug.com");
// This succeeds on JDK 16, 17
retrieveMany(urls, 4);
// This fails on JDK 17, but succeeds on 16
retrieveMany(urls, 3);
}
private static List<String> retrieveMany(List<String> urls, int threads) throws InterruptedException, ExecutionException {
return new ForkJoinPool(threads, ForkJoinPool.defaultForkJoinWorkerThreadFactory, (t, e) -> {}, true, 0, threads, 1, null, 1, MINUTES)
.submit(() -> urls.parallelStream()
.map(url -> {
try {
return HttpClient.newBuilder().build().send(HttpRequest.newBuilder(URI.create(url)).build(), BodyHandlers.ofString()).body();
} catch (IOException | InterruptedException aE) { }
return null;
})
.collect(toList()))
.get();
}
}
您提交了一项任务,但它在内部使用了 parallelStream()
,然后在同一个分支连接池的不同线程上运行每个 http。
JDK16 和 17 处理池中所有可用线程都在使用的情况的方式有所不同 - 这就是 saturated
参数变得相关的地方。
当 threads > urls.size()
池永远不会饱和,但在第二种情况下 threads == urls.size()
所以所有线程都在使用中。将 ForkJoinPool
的构造函数中的 null
替换为 saturate
变量,以查看何时触发饱和测试条件:
Predicate<? super ForkJoinPool> saturate = pool -> {
boolean allow = false;
System.out.println(Thread.currentThread().getName()+" saturate:"+allow);
return allow;
};
在 JDK16 上 saturate
谓词被调用多次但继续,而在 JDK17 上,如果它 returns 为假,则处理在第一次调用时停止。如果您切换 allow = true
,则当正在进行的请求数与 parallelStream()
使用的线程数相同时,JDK17 将不会发送 RejectedExecutionException
,并且会在以下情况下继续处理进一步的请求其他请求已完成。
我使用 Java 16 通过 HTTP 向 API 发出请求。为了加快整体速度,我将其加载到自定义 ForkJoinPool
上。我在下面编译了一个重现示例。
自从移动到 Java 17(openjdk 构建 17.0.1+12-39)后,这将引发 RejectedExecutionException:
Caused by: java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker
at java.base/java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:1819)
at java.base/java.util.concurrent.ForkJoinPool.compensatedBlock(ForkJoinPool.java:3446)
at java.base/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3432)
at java.base/java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1898)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2072)
at java.net.http/jdk.internal.net.http.HttpClientImpl.send(HttpClientImpl.java:553)
at java.net.http/jdk.internal.net.http.HttpClientFacade.send(HttpClientFacade.java:119)
at Test.lambda$retrieveMany(Test.java:30)
为什么会这样? ForkJoinPool 是否发生了我不知道的变化?
代码
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.stream.Collectors.toList;
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
final List<String> urls = List.of("https://whosebug.com", "https://whosebug.com", "https://whosebug.com");
// This succeeds on JDK 16, 17
retrieveMany(urls, 4);
// This fails on JDK 17, but succeeds on 16
retrieveMany(urls, 3);
}
private static List<String> retrieveMany(List<String> urls, int threads) throws InterruptedException, ExecutionException {
return new ForkJoinPool(threads, ForkJoinPool.defaultForkJoinWorkerThreadFactory, (t, e) -> {}, true, 0, threads, 1, null, 1, MINUTES)
.submit(() -> urls.parallelStream()
.map(url -> {
try {
return HttpClient.newBuilder().build().send(HttpRequest.newBuilder(URI.create(url)).build(), BodyHandlers.ofString()).body();
} catch (IOException | InterruptedException aE) { }
return null;
})
.collect(toList()))
.get();
}
}
您提交了一项任务,但它在内部使用了 parallelStream()
,然后在同一个分支连接池的不同线程上运行每个 http。
JDK16 和 17 处理池中所有可用线程都在使用的情况的方式有所不同 - 这就是 saturated
参数变得相关的地方。
当 threads > urls.size()
池永远不会饱和,但在第二种情况下 threads == urls.size()
所以所有线程都在使用中。将 ForkJoinPool
的构造函数中的 null
替换为 saturate
变量,以查看何时触发饱和测试条件:
Predicate<? super ForkJoinPool> saturate = pool -> {
boolean allow = false;
System.out.println(Thread.currentThread().getName()+" saturate:"+allow);
return allow;
};
在 JDK16 上 saturate
谓词被调用多次但继续,而在 JDK17 上,如果它 returns 为假,则处理在第一次调用时停止。如果您切换 allow = true
,则当正在进行的请求数与 parallelStream()
使用的线程数相同时,JDK17 将不会发送 RejectedExecutionException
,并且会在以下情况下继续处理进一步的请求其他请求已完成。