如何使用 Mono 和 Flux 限制并发 http 请求
How to limit concurrent http requests with Mono & Flux
我想处理 Flux
以限制 Mono
列表发出的并发 HTTP 请求。
完成一些请求(收到响应)后,服务会请求另一个请求,直到等待请求的总数为 15。
单个请求returns一个列表并根据结果触发另一个请求。
此时我想发送有限并发的请求。
因为消费者端,过多的HTTP请求会使对端服务器陷入困境。
我像下面这样使用 flatMapMany
。
public Flux<JsonNode> syncData() {
return service1
.getData(param1)
.flatMapMany(res -> {
List<Mono<JsonNode>> totalTask = new ArrayList<>();
Map<String, Object> originData = service2.getDataFromDB(param2);
res.withArray("data").forEach(row -> {
String id = row.get("id").asText();
if (originData.containsKey(id)) {
totalTask.add(service1.updateRequest(param3));
} else {
totalTask.add(service1.deleteRequest(param4));
}
originData.remove(id);
});
for (left) {
totalTask.add(service1.createRequest(param5));
}
return Flux.merge(totalTask);
});
}
void syncData() {
syncDataService.syncData().????;
}
我试过链接 .window(15)
,但它不起作用。所有请求同时发送。
我如何处理 Flux
我的目标?
恐怕 Project Reactor 没有提供任何速率或时间限制的实现。
但是,您可以找到一堆提供此类功能并与 Project Reactor 兼容的第 3 方库。据我所知,resilience4-reactor 支持它并且还与 Spring 和 Spring 引导框架兼容。
The RateLimiterOperator
checks if a downstream subscriber/observer can acquire a permission to subscribe to an upstream Publisher. If the rate limit would be exceeded, the RateLimiterOperator
could either delay requesting data from the upstream or it can emit a RequestNotPermitted
error to the downstream subscriber.
RateLimiter rateLimiter = RateLimiter.ofDefaults("name");
Mono.fromCallable(backendService::doSomething)
.transformDeferred(RateLimiterOperator.of(rateLimiter))
更多关于 RateLimiter 模块本身的信息:https://resilience4j.readme.io/docs/ratelimiter
您可以在 Flux 上使用 limitRate
。您可能需要稍微重新格式化您的代码,但请在此处查看文档:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#limitRate-int-
flatMap
接受一个 concurrency
参数:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#flatMap-java.util.function.Function-int-
Mono<User> getById(int userId) { ... }
Flux.just(1, 2, 3, 4).flatMap(client::getById, 2)
将并发请求数限制为2。
我想处理 Flux
以限制 Mono
列表发出的并发 HTTP 请求。
完成一些请求(收到响应)后,服务会请求另一个请求,直到等待请求的总数为 15。
单个请求returns一个列表并根据结果触发另一个请求。
此时我想发送有限并发的请求。 因为消费者端,过多的HTTP请求会使对端服务器陷入困境。
我像下面这样使用 flatMapMany
。
public Flux<JsonNode> syncData() {
return service1
.getData(param1)
.flatMapMany(res -> {
List<Mono<JsonNode>> totalTask = new ArrayList<>();
Map<String, Object> originData = service2.getDataFromDB(param2);
res.withArray("data").forEach(row -> {
String id = row.get("id").asText();
if (originData.containsKey(id)) {
totalTask.add(service1.updateRequest(param3));
} else {
totalTask.add(service1.deleteRequest(param4));
}
originData.remove(id);
});
for (left) {
totalTask.add(service1.createRequest(param5));
}
return Flux.merge(totalTask);
});
}
void syncData() {
syncDataService.syncData().????;
}
我试过链接 .window(15)
,但它不起作用。所有请求同时发送。
我如何处理 Flux
我的目标?
恐怕 Project Reactor 没有提供任何速率或时间限制的实现。
但是,您可以找到一堆提供此类功能并与 Project Reactor 兼容的第 3 方库。据我所知,resilience4-reactor 支持它并且还与 Spring 和 Spring 引导框架兼容。
The
RateLimiterOperator
checks if a downstream subscriber/observer can acquire a permission to subscribe to an upstream Publisher. If the rate limit would be exceeded, theRateLimiterOperator
could either delay requesting data from the upstream or it can emit aRequestNotPermitted
error to the downstream subscriber.
RateLimiter rateLimiter = RateLimiter.ofDefaults("name");
Mono.fromCallable(backendService::doSomething)
.transformDeferred(RateLimiterOperator.of(rateLimiter))
更多关于 RateLimiter 模块本身的信息:https://resilience4j.readme.io/docs/ratelimiter
您可以在 Flux 上使用 limitRate
。您可能需要稍微重新格式化您的代码,但请在此处查看文档:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#limitRate-int-
flatMap
接受一个 concurrency
参数:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#flatMap-java.util.function.Function-int-
Mono<User> getById(int userId) { ... }
Flux.just(1, 2, 3, 4).flatMap(client::getById, 2)
将并发请求数限制为2。