如何在Spring-Webflux 中限制并行执行?
How to limit parallel execution in Spring-Webflux?
我的其余客户端实现使用的是 Project Reactor 中的 Webflux,类似于以下内容:
Flux<Response1> request1(String uri) {
return webClient
.get()
.uri(uri)
.retrieve()
.bodyToMono(Responses1.class)
.map(r -> response1ToList(r))
.flatMapMany(Flux::fromIterable);
}
Flux<Response2> request2(Response1 response1) {
uri = f(response1);
return webClient
.get()
.uri(uri)
.retrieve
.bodyToMono(Responses2.class)
.map(r -> response1ToList(r))
.flatMapMany(Flux::fromIterable);
}
Flux<Response2> res2 = res1.flatMap(request2).subscribe();
初始请求 (request1) returns 用于发送一系列请求 (request2) 的元素列表。
我的问题是 request2 都是并行发送的,这对服务器来说太重了。
有没有办法限制同时执行的request2个数?
您可以通过传入并发因子来控制订阅的内部流flatMap()
的数量。
Flux<Response1> request1(String uri) {
return webClient
.get()
.uri(uri)
.retrieve()
.bodyToMono(Responses1.class)
.map(r -> response1ToList(r))
.flatMapMany(Flux::fromIterable);
}
Flux<Response2> request2(Response1 response1) {
uri = f(response1);
return webClient
.get()
.uri(uri)
.retrieve
.bodyToMono(Responses2.class)
.map(r -> response1ToList(r))
.flatMapMany(Flux::fromIterable);
}
Flux<Response2> res2 = res1.flatMap(request2, concurrencyFactor).subscribe();
如果你想一个一个地做,你可以使用一个concatMap()
。
我的其余客户端实现使用的是 Project Reactor 中的 Webflux,类似于以下内容:
Flux<Response1> request1(String uri) {
return webClient
.get()
.uri(uri)
.retrieve()
.bodyToMono(Responses1.class)
.map(r -> response1ToList(r))
.flatMapMany(Flux::fromIterable);
}
Flux<Response2> request2(Response1 response1) {
uri = f(response1);
return webClient
.get()
.uri(uri)
.retrieve
.bodyToMono(Responses2.class)
.map(r -> response1ToList(r))
.flatMapMany(Flux::fromIterable);
}
Flux<Response2> res2 = res1.flatMap(request2).subscribe();
初始请求 (request1) returns 用于发送一系列请求 (request2) 的元素列表。
我的问题是 request2 都是并行发送的,这对服务器来说太重了。
有没有办法限制同时执行的request2个数?
您可以通过传入并发因子来控制订阅的内部流flatMap()
的数量。
Flux<Response1> request1(String uri) {
return webClient
.get()
.uri(uri)
.retrieve()
.bodyToMono(Responses1.class)
.map(r -> response1ToList(r))
.flatMapMany(Flux::fromIterable);
}
Flux<Response2> request2(Response1 response1) {
uri = f(response1);
return webClient
.get()
.uri(uri)
.retrieve
.bodyToMono(Responses2.class)
.map(r -> response1ToList(r))
.flatMapMany(Flux::fromIterable);
}
Flux<Response2> res2 = res1.flatMap(request2, concurrencyFactor).subscribe();
如果你想一个一个地做,你可以使用一个concatMap()
。