Webflux - 使用有界弹性调度程序时挂起请求

Webflux - hanging requests when using bounded elastic Scheduler

我有一个使用 webflux 编写的高负载服务(每秒 40 个请求) 我遇到了一个非常糟糕的延迟和性能问题以及我无法解释的行为:在高峰期间的某个时刻,服务挂在随机位置,就好像它没有任何线程来处理请求一样。 然而,该服务确实有几个对不同服务的非反应性调用 - 使用 WebClient,另一个对主要服务的调用通过包装在 Mono.fromCallable(..).publishOn(Schedulers.boundedElastic()).[=20= 中的 sdk 检索主要数据]

所以流程是:

  1. 根据要求,例如 Mono<Request>
  2. 转换为内部对象Mono<RequestAggregator>
  3. 调用 GCP 获取 JWT 令牌,然后使用 webclient
  4. 调用一些服务获取数据
  5. 使用Mono.fromCallable(MainService.getData(RequestAggregator)).publishOn(Schedulers.boundedElastic())
  6. 调用主服务
  7. 调用其他服务获取更多数据(与 3 相同)
  8. 调用其他服务获取更多数据(与 3 相同)
  9. 对所有数据进行一些操作,然后 return Mono<Response>

网络客户端调用看起来像这样:

Mono.fromCallable(() -> GoogleService.getToken(account, clientId)
                .buildIapRequest(REQUEST_URL))
                .map(httpRequest -> httpRequest.getHeaders().getAuthorization())
                .flatMap(authToken -> webClient.post()
                        .uri("/call/some/endpoint")
                        .header(HttpHeaders.AUTHORIZATION, authToken)
                        .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                        .header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
                        .body(BodyInserters.fromValue(countries))
                        .retrieve()
                        .onStatus(HttpStatus::isError, clientResponse -> {
                            log.error("{} got status code: {}",
                                    ERROR_MSG_ERROR, clientResponse.statusCode());
                            return Mono.error(new SomeWebClientException(STATE_ABBREVIATIONS_ERROR));
                        })
                        .bodyToMono(SomeData.class));

有时第 6 步挂起超过 11 分钟,而此服务没有任何问题。它不是反应性的,但响应时间约为 400 毫秒

另一件值得一提的事情是MainService是一个繁重的IO操作,可能需要1分钟或更长时间。

我觉得 MainService 上有很多请求挂起,没有任何线程留给其他操作,这有意义吗?如果是这样,如何解决这样的问题?

有人可以提出这个问题的任何原因吗?我没主意了

在不了解完整应用程序的情况下无法确定,但阻塞 IO 操作确实是最有可能的罪魁祸首。

Schedulers.boundedElastic(),顾名思义,是有界的。默认情况下,界限是“可用 CPU 核心数的十倍”,因此在 2 核机器上它将是 20。如果并发请求数超过限制,其余的将放入队列中等待无限期地获得免费线程。如果您需要比这更多的并发性,您应该考虑使用具有更高限制的 Scheduler.fromExecutor 来设置您自己的调度程序。