Parallel Flux 阻塞调用

Parallel Flux blocking call

我的应用程序设置作为问题的一部分被提及# 我正在尝试使用 Spring webclient 在 Spring AMQP rabbit MQ 消费者线程中进行 API 调用。 问题似乎是并行通量阻塞调用在前几个请求被触发后停止或需要很长时间。

为了模拟这一点,我做了以下简约设置 -

使用的依赖项

Spring boot 2.2.6.RELEASE
spring-boot-starter-web
spring-boot-starter-webflux
reactor-netty 0.9.14.RELEASE

如其他链接问题所述,以下是 webclient 的配置 -

@Bean
public WebClient webClient() {
ConnectionProvider connectionProvider = ConnectionProvider
.builder("fixed")
.lifo()
.pendingAcquireTimeout(Duration.ofMillis(200000))
.maxConnections(100)
.pendingAcquireMaxCount(3000)
.maxIdleTime(Duration.ofMillis(290000))
.build();

HttpClient client = HttpClient.create(connectionProvider);
client.tcpConfiguration(<<connection timeout, read timeout, write timeout is set here....>>);

Webclient.Builder builder = 
Webclient.builder().baseUrl(<<base URL>>).clientConnector(new ReactorClientHttpConnector(client));

return builder.build();
}

下面是 @Service class 并行 flux webclient 调用 -

@Service
public class FluxtestService {

public Flux<Response> getFlux(List<Request> reqList) {
  return Flux
.fromIterable(reqList)
.parallel()
.runOn(Schedulers.elastic())
.flatMap(s -> {
  return webClient
         .method(POST)
         .uri(<<downstream url>>)
         .body(BodyInserters.fromValue(s))
         .exchange()
         .flatMap(response -> {
            if(response.statusCode().isError()){
               return Mono.just(new Response());
            }
            return response.bodyToMono(Response.class);
         }) 
        }).sequential();
}
}
}

为了模拟Spring AMQP rabbit mq consumer/listener,我在下面创建了@RestController -

@RestController
public class FluxTestController

@Autowired
private FluxtestService service;

@PostMapping("/fluxtest")
public List<Response> getFlux (List<Request> reqlist) {
return service.getFlux(reqlist).collectList().block();
}

我尝试使用大约 15 个线程从 jmeter 触发请求。前几组请求处理得非常快。在处理请求时,我可以在日志文件中看到以下日志集 -

Channel cleaned, now 32 active connections and 68 inactive connections

一旦我提交了更多请求集,活动连接就会不断增加,直到达到配置的最大值 100。我根本看不到它在减少。至此,响应时间还可以。

但是任何后续请求开始需要很长时间。此外,即使没有请求被触发,我也没有看到活动连接减少太多。

同样过了一段时间,我看到以下异常 -

reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException: Pool#acquire(Duration) has been pending for more than the configured timeout of 200000 ms

这可能表明下游连接没有释放。请就此问题和可能的修复提供建议。

问题似乎是因为未正确释放基础连接以防 webclient 下游调用以错误状态响应。在将“exchange”与“webclient”一起使用时,似乎我们需要确保正确发布响应;否则会导致连接泄漏。以下是似乎解决此问题的更改 -

替换

.flatMap(response -> {
   if(response.statusCode().isError()) {
     return Mono.just(new Response());
   }
   return response.bodyToMono(Response.class);
})

.flatMap(response -> {
   if(response.statusCode().isError()) {
     response.releaseBody().thenReturn(Mono.just(new Response()));
   }
   return response.bodyToMono(Response.class);
})