如何使用 Mono 和 Flux 限制并发 http 请求

How to limit concurrent http requests with Mono & Flux

我想处理 Flux 以限制 Mono 列表发出的并发 HTTP 请求。

完成一些请求(收到响应)后,服务会请求另一个请求,直到等待请求的总数为 15。

单个请求returns一个列表并根据结果触发另一个请求。

此时我想发送有限并发的请求。 因为消费者端,过多的HTTP请求会使对端服务器陷入困境。

我像下面这样使用 flatMapMany

public Flux<JsonNode> syncData() {
    return service1
        .getData(param1)
        .flatMapMany(res -> {
                List<Mono<JsonNode>> totalTask = new ArrayList<>();
                Map<String, Object> originData = service2.getDataFromDB(param2);
                res.withArray("data").forEach(row -> {
                       String id = row.get("id").asText();
                       if (originData.containsKey(id)) {
                           totalTask.add(service1.updateRequest(param3));
                       } else {
                            totalTask.add(service1.deleteRequest(param4));
                       }
                       originData.remove(id);
                });
                for (left) {
                    totalTask.add(service1.createRequest(param5));
                }
                return Flux.merge(totalTask);
        });
}
void syncData() {
    syncDataService.syncData().????;
}

我试过链接 .window(15),但它不起作用。所有请求同时发送。

我如何处理 Flux 我的目标?

恐怕 Project Reactor 没有提供任何速率或时间限制的实现。

但是,您可以找到一堆提供此类功能并与 Project Reactor 兼容的第 3 方库。据我所知,resilience4-reactor 支持它并且还与 Spring 和 Spring 引导框架兼容。

The RateLimiterOperator checks if a downstream subscriber/observer can acquire a permission to subscribe to an upstream Publisher. If the rate limit would be exceeded, the RateLimiterOperator could either delay requesting data from the upstream or it can emit a RequestNotPermitted error to the downstream subscriber.

RateLimiter rateLimiter = RateLimiter.ofDefaults("name");
Mono.fromCallable(backendService::doSomething)
    .transformDeferred(RateLimiterOperator.of(rateLimiter))

更多关于 RateLimiter 模块本身的信息:https://resilience4j.readme.io/docs/ratelimiter

您可以在 Flux 上使用 limitRate。您可能需要稍微重新格式化您的代码,但请在此处查看文档:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#limitRate-int-

flatMap 接受一个 concurrency 参数:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#flatMap-java.util.function.Function-int-


Mono<User> getById(int userId) { ... }

Flux.just(1, 2, 3, 4).flatMap(client::getById, 2)

将并发请求数限制为2。