有没有办法将 CompletableFuture 放入循环中?

Is there a way to put a CompletableFuture in a loop?

下面代码的问题是我必须等待所有三个任务完成。

如果第一个和第二个任务在 200 毫秒内完成,而第三个任务在 2 秒内完成,那么我将不得不等待 2 秒才能加载下三个 URL。

理想情况下,我会在每个任务完成后立即发送一个新请求,并以某种方式延迟主线程,直到 ArrayList 为空。

简单来说,我希望每个可完成的未来都 运行 在一种由旧任务完成触发的循环中。

(我在 JavaScript 中使用事件经常这样做)

谁能想到我如何实现这一点?

    private static void httpClientExample(){

    ArrayList<String> urls = new ArrayList<>(
            Arrays.asList(
                    "https://www.bing.com/",
                    "https://openjdk.java.net/",
                    "https://openjdk.java.net/",
                    "https://google.com/",
                    "https://github.com/",
                    "https://whosebug.com/"
            ));

    HttpClient httpClient = HttpClient.newHttpClient();

    var task1 = httpClient.sendAsync(HttpRequest.newBuilder()
            .uri(URI.create(urls.get(0)))
            .build(), HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::uri).thenAccept(System.out::println);

    var task2 = httpClient.sendAsync(HttpRequest.newBuilder()
            .uri(URI.create(urls.get(1)))
            .build(), HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::uri).thenAccept(System.out::println);

    var task3 = httpClient.sendAsync(HttpRequest.newBuilder()
            .uri(URI.create(urls.get(2)))
            .build(), HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::uri).thenAccept(System.out::println);

    // All tasks have to complete
    var all = CompletableFuture.allOf(task1, task2, task3).join();
    
    // Get the next 3 URLs

    System.out.println("Main Thread Completed");
}

如果您对并行调用的最大数量没有要求,事情就会变得容易得多:

private static void httpClientExample() throws Exception {

  final ArrayList<String> urls = ...; //list of urls 

  final HttpClient httpClient = HttpClient.newBuilder().executor(
                                    Executors.newFixedThreadPool(10)).build();

  final List<CompletableFuture<Void>> allFutures = new ArrayList<>();
  for (String url : urls) {
    final CompletableFuture<Void> completableFuture = httpClient
        .sendAsync(HttpRequest.newBuilder().uri(URI.create(url)).build(),
            HttpResponse.BodyHandlers.ofString())
        .thenApply(HttpResponse::uri).thenAccept(System.out::println);
    allFutures.add(completableFuture);
  }

  CompletableFuture.allOf(allFutures.toArray(CompletableFuture[]::new)).get();
}

让作业本身删除另一个挂起的 URL 并提交它,将需要一个线程安全队列。

让主线程做可能更容易,例如喜欢

var httpClient = HttpClient.newHttpClient();
var pending = new ArrayDeque<CompletableFuture<?>>(3);
for(String url: urls) {
    while(pending.size() >= 3 && !pending.removeIf(CompletableFuture::isDone))
        CompletableFuture.anyOf(pending.toArray(CompletableFuture<?>[]::new)).join();

    pending.addLast(httpClient.sendAsync(HttpRequest.newBuilder()
            .uri(URI.create(url))
            .build(), HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::uri).thenAccept(System.out::println));
}
CompletableFuture.allOf(pending.toArray(CompletableFuture<?>[]::new)).join();

这将等到三个提交的作业中至少有一个完成(使用 anyOf/join),然后再提交下一个作业。当循环结束时,最多可能还有三个 运行 作业。循环后的allOf/join会等待那些作业完成,所以后面的作业都已经完成了。当您希望启动器线程在已知所有作业已提交时继续进行,而不等待它们完成,只需删除最后一条语句。