有没有办法将 CompletableFuture 放入循环中?
Is there a way to put a CompletableFuture in a loop?
下面代码的问题是我必须等待所有三个任务完成。
如果第一个和第二个任务在 200 毫秒内完成,而第三个任务在 2 秒内完成,那么我将不得不等待 2 秒才能加载下三个 URL。
理想情况下,我会在每个任务完成后立即发送一个新请求,并以某种方式延迟主线程,直到 ArrayList 为空。
简单来说,我希望每个可完成的未来都 运行 在一种由旧任务完成触发的循环中。
(我在 JavaScript 中使用事件经常这样做)
谁能想到我如何实现这一点?
private static void httpClientExample(){
ArrayList<String> urls = new ArrayList<>(
Arrays.asList(
"https://www.bing.com/",
"https://openjdk.java.net/",
"https://openjdk.java.net/",
"https://google.com/",
"https://github.com/",
"https://whosebug.com/"
));
HttpClient httpClient = HttpClient.newHttpClient();
var task1 = httpClient.sendAsync(HttpRequest.newBuilder()
.uri(URI.create(urls.get(0)))
.build(), HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::uri).thenAccept(System.out::println);
var task2 = httpClient.sendAsync(HttpRequest.newBuilder()
.uri(URI.create(urls.get(1)))
.build(), HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::uri).thenAccept(System.out::println);
var task3 = httpClient.sendAsync(HttpRequest.newBuilder()
.uri(URI.create(urls.get(2)))
.build(), HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::uri).thenAccept(System.out::println);
// All tasks have to complete
var all = CompletableFuture.allOf(task1, task2, task3).join();
// Get the next 3 URLs
System.out.println("Main Thread Completed");
}
如果您对并行调用的最大数量没有要求,事情就会变得容易得多:
private static void httpClientExample() throws Exception {
final ArrayList<String> urls = ...; //list of urls
final HttpClient httpClient = HttpClient.newBuilder().executor(
Executors.newFixedThreadPool(10)).build();
final List<CompletableFuture<Void>> allFutures = new ArrayList<>();
for (String url : urls) {
final CompletableFuture<Void> completableFuture = httpClient
.sendAsync(HttpRequest.newBuilder().uri(URI.create(url)).build(),
HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::uri).thenAccept(System.out::println);
allFutures.add(completableFuture);
}
CompletableFuture.allOf(allFutures.toArray(CompletableFuture[]::new)).get();
}
让作业本身删除另一个挂起的 URL 并提交它,将需要一个线程安全队列。
让主线程做可能更容易,例如喜欢
var httpClient = HttpClient.newHttpClient();
var pending = new ArrayDeque<CompletableFuture<?>>(3);
for(String url: urls) {
while(pending.size() >= 3 && !pending.removeIf(CompletableFuture::isDone))
CompletableFuture.anyOf(pending.toArray(CompletableFuture<?>[]::new)).join();
pending.addLast(httpClient.sendAsync(HttpRequest.newBuilder()
.uri(URI.create(url))
.build(), HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::uri).thenAccept(System.out::println));
}
CompletableFuture.allOf(pending.toArray(CompletableFuture<?>[]::new)).join();
这将等到三个提交的作业中至少有一个完成(使用 anyOf
/join
),然后再提交下一个作业。当循环结束时,最多可能还有三个 运行 作业。循环后的allOf
/join
会等待那些作业完成,所以后面的作业都已经完成了。当您希望启动器线程在已知所有作业已提交时继续进行,而不等待它们完成,只需删除最后一条语句。
下面代码的问题是我必须等待所有三个任务完成。
如果第一个和第二个任务在 200 毫秒内完成,而第三个任务在 2 秒内完成,那么我将不得不等待 2 秒才能加载下三个 URL。
理想情况下,我会在每个任务完成后立即发送一个新请求,并以某种方式延迟主线程,直到 ArrayList 为空。
简单来说,我希望每个可完成的未来都 运行 在一种由旧任务完成触发的循环中。
(我在 JavaScript 中使用事件经常这样做)
谁能想到我如何实现这一点?
private static void httpClientExample(){
ArrayList<String> urls = new ArrayList<>(
Arrays.asList(
"https://www.bing.com/",
"https://openjdk.java.net/",
"https://openjdk.java.net/",
"https://google.com/",
"https://github.com/",
"https://whosebug.com/"
));
HttpClient httpClient = HttpClient.newHttpClient();
var task1 = httpClient.sendAsync(HttpRequest.newBuilder()
.uri(URI.create(urls.get(0)))
.build(), HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::uri).thenAccept(System.out::println);
var task2 = httpClient.sendAsync(HttpRequest.newBuilder()
.uri(URI.create(urls.get(1)))
.build(), HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::uri).thenAccept(System.out::println);
var task3 = httpClient.sendAsync(HttpRequest.newBuilder()
.uri(URI.create(urls.get(2)))
.build(), HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::uri).thenAccept(System.out::println);
// All tasks have to complete
var all = CompletableFuture.allOf(task1, task2, task3).join();
// Get the next 3 URLs
System.out.println("Main Thread Completed");
}
如果您对并行调用的最大数量没有要求,事情就会变得容易得多:
private static void httpClientExample() throws Exception {
final ArrayList<String> urls = ...; //list of urls
final HttpClient httpClient = HttpClient.newBuilder().executor(
Executors.newFixedThreadPool(10)).build();
final List<CompletableFuture<Void>> allFutures = new ArrayList<>();
for (String url : urls) {
final CompletableFuture<Void> completableFuture = httpClient
.sendAsync(HttpRequest.newBuilder().uri(URI.create(url)).build(),
HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::uri).thenAccept(System.out::println);
allFutures.add(completableFuture);
}
CompletableFuture.allOf(allFutures.toArray(CompletableFuture[]::new)).get();
}
让作业本身删除另一个挂起的 URL 并提交它,将需要一个线程安全队列。
让主线程做可能更容易,例如喜欢
var httpClient = HttpClient.newHttpClient();
var pending = new ArrayDeque<CompletableFuture<?>>(3);
for(String url: urls) {
while(pending.size() >= 3 && !pending.removeIf(CompletableFuture::isDone))
CompletableFuture.anyOf(pending.toArray(CompletableFuture<?>[]::new)).join();
pending.addLast(httpClient.sendAsync(HttpRequest.newBuilder()
.uri(URI.create(url))
.build(), HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::uri).thenAccept(System.out::println));
}
CompletableFuture.allOf(pending.toArray(CompletableFuture<?>[]::new)).join();
这将等到三个提交的作业中至少有一个完成(使用 anyOf
/join
),然后再提交下一个作业。当循环结束时,最多可能还有三个 运行 作业。循环后的allOf
/join
会等待那些作业完成,所以后面的作业都已经完成了。当您希望启动器线程在已知所有作业已提交时继续进行,而不等待它们完成,只需删除最后一条语句。