如何在Spring-Webflux 中限制并行执行?

How to limit parallel execution in Spring-Webflux?

我的其余客户端实现使用的是 Project Reactor 中的 Webflux,类似于以下内容:

Flux<Response1> request1(String uri) {
  return webClient
          .get()
          .uri(uri)
          .retrieve()
          .bodyToMono(Responses1.class)
          .map(r -> response1ToList(r))
          .flatMapMany(Flux::fromIterable);
}

Flux<Response2> request2(Response1 response1) {
  uri = f(response1);
  return webClient
          .get()
          .uri(uri)
          .retrieve
          .bodyToMono(Responses2.class)
          .map(r -> response1ToList(r))
          .flatMapMany(Flux::fromIterable);
}

Flux<Response2> res2 = res1.flatMap(request2).subscribe();

初始请求 (request1) returns 用于发送一系列请求 (request2) 的元素列表。

我的问题是 request2 都是并行发送的,这对服务器来说太重了。

有没有办法限制同时执行的request2个数?

您可以通过传入并发因子来控制订阅的内部流flatMap()的数量。

Flux<Response1> request1(String uri) {
  return webClient
          .get()
          .uri(uri)
          .retrieve()
          .bodyToMono(Responses1.class)
          .map(r -> response1ToList(r))
          .flatMapMany(Flux::fromIterable);
}

Flux<Response2> request2(Response1 response1) {
  uri = f(response1);
  return webClient
          .get()
          .uri(uri)
          .retrieve
          .bodyToMono(Responses2.class)
          .map(r -> response1ToList(r))
          .flatMapMany(Flux::fromIterable);
}

Flux<Response2> res2 = res1.flatMap(request2, concurrencyFactor).subscribe();

如果你想一个一个地做,你可以使用一个concatMap()