CompletableFuture 按请求组分组

CompletableFuture grouped by groups of requests

我想向 API Rest 发送 100K 个请求。所以我有一个循环,我以同步方式发送请求(并且需要几分钟才能完成所有请求)。

但最好以异步方式发送它们,因为目的地可以扩展它们的副本,所以该过程可以花费更少的时间。

我已经阅读了 CompletableFuture 以及如何使用 allOf() 方法组合所有结果。

但是有什么方法可以以 1K 请求为一组进行调用吗?因为如果我并行发送所有请求,目的地将关闭连接。

所以我想在 1K 并行请求组中发送请求,当每个组完成时,对响应做一些事情(我不需要订购的响应)

谢谢。

是的,您可以使用可完成的期货,这很容易完成。

假设您有 HTTP 客户端 return CompletableFuture on requests。

package com.example.demo.cf;

import com.example.demo.dto.Response;

import java.util.concurrent.CompletableFuture;

public interface HttpClient {
    CompletableFuture<Response> makeHttp();
}

并有批量和顺序请求的服务。

package com.example.demo.cf;

import java.util.concurrent.CompletableFuture;
import java.util.stream.IntStream;

public class CfBatchingService {

    private static final int BATCH_SIZE = 100;

    private HttpClient httpClient;

    public void sendRequests() {
        CompletableFuture<Void> cf = CompletableFuture.completedFuture(null);
        for(int grp = 0; grp < 10; grp++) {
            cf = cf.thenCompose(unused -> CompletableFuture.allOf(IntStream.range(0, BATCH_SIZE).mapToObj(idx -> httpClient.makeHttp()).toArray(CompletableFuture[]::new)));
        }

        cf.whenComplete((unused, throwable) -> {
           System.out.println("All requested executed");
        });
    }

}

所以,这里唯一需要做的就是使用 thenCompose 方法,期望从 lambda 中获取 CompletableFuture。

要将请求分组并创建单个 CompletableFuture,您可以使用方法 CompletableFuture.allOf