Webflux - 使用有界弹性调度程序时挂起请求
Webflux - hanging requests when using bounded elastic Scheduler
我有一个使用 webflux 编写的高负载服务(每秒 40 个请求)
我遇到了一个非常糟糕的延迟和性能问题以及我无法解释的行为:在高峰期间的某个时刻,服务挂在随机位置,就好像它没有任何线程来处理请求一样。
然而,该服务确实有几个对不同服务的非反应性调用 - 使用 WebClient
,另一个对主要服务的调用通过包装在 Mono.fromCallable(..).publishOn(Schedulers.boundedElastic())
.[=20= 中的 sdk 检索主要数据]
所以流程是:
- 根据要求,例如
Mono<Request>
- 转换为内部对象
Mono<RequestAggregator>
- 调用 GCP 获取 JWT 令牌,然后使用
webclient
调用一些服务获取数据
- 使用
Mono.fromCallable(MainService.getData(RequestAggregator)).publishOn(Schedulers.boundedElastic())
调用主服务
- 调用其他服务获取更多数据(与 3 相同)
- 调用其他服务获取更多数据(与 3 相同)
- 对所有数据进行一些操作,然后 return
Mono<Response>
网络客户端调用看起来像这样:
Mono.fromCallable(() -> GoogleService.getToken(account, clientId)
.buildIapRequest(REQUEST_URL))
.map(httpRequest -> httpRequest.getHeaders().getAuthorization())
.flatMap(authToken -> webClient.post()
.uri("/call/some/endpoint")
.header(HttpHeaders.AUTHORIZATION, authToken)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
.body(BodyInserters.fromValue(countries))
.retrieve()
.onStatus(HttpStatus::isError, clientResponse -> {
log.error("{} got status code: {}",
ERROR_MSG_ERROR, clientResponse.statusCode());
return Mono.error(new SomeWebClientException(STATE_ABBREVIATIONS_ERROR));
})
.bodyToMono(SomeData.class));
有时第 6 步挂起超过 11 分钟,而此服务没有任何问题。它不是反应性的,但响应时间约为 400 毫秒
另一件值得一提的事情是MainService
是一个繁重的IO操作,可能需要1分钟或更长时间。
我觉得 MainService
上有很多请求挂起,没有任何线程留给其他操作,这有意义吗?如果是这样,如何解决这样的问题?
有人可以提出这个问题的任何原因吗?我没主意了
在不了解完整应用程序的情况下无法确定,但阻塞 IO 操作确实是最有可能的罪魁祸首。
Schedulers.boundedElastic()
,顾名思义,是有界的。默认情况下,界限是“可用 CPU 核心数的十倍”,因此在 2 核机器上它将是 20。如果并发请求数超过限制,其余的将放入队列中等待无限期地获得免费线程。如果您需要比这更多的并发性,您应该考虑使用具有更高限制的 Scheduler.fromExecutor
来设置您自己的调度程序。
我有一个使用 webflux 编写的高负载服务(每秒 40 个请求)
我遇到了一个非常糟糕的延迟和性能问题以及我无法解释的行为:在高峰期间的某个时刻,服务挂在随机位置,就好像它没有任何线程来处理请求一样。
然而,该服务确实有几个对不同服务的非反应性调用 - 使用 WebClient
,另一个对主要服务的调用通过包装在 Mono.fromCallable(..).publishOn(Schedulers.boundedElastic())
.[=20= 中的 sdk 检索主要数据]
所以流程是:
- 根据要求,例如
Mono<Request>
- 转换为内部对象
Mono<RequestAggregator>
- 调用 GCP 获取 JWT 令牌,然后使用
webclient
调用一些服务获取数据
- 使用
Mono.fromCallable(MainService.getData(RequestAggregator)).publishOn(Schedulers.boundedElastic())
调用主服务
- 调用其他服务获取更多数据(与 3 相同)
- 调用其他服务获取更多数据(与 3 相同)
- 对所有数据进行一些操作,然后 return
Mono<Response>
网络客户端调用看起来像这样:
Mono.fromCallable(() -> GoogleService.getToken(account, clientId)
.buildIapRequest(REQUEST_URL))
.map(httpRequest -> httpRequest.getHeaders().getAuthorization())
.flatMap(authToken -> webClient.post()
.uri("/call/some/endpoint")
.header(HttpHeaders.AUTHORIZATION, authToken)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
.body(BodyInserters.fromValue(countries))
.retrieve()
.onStatus(HttpStatus::isError, clientResponse -> {
log.error("{} got status code: {}",
ERROR_MSG_ERROR, clientResponse.statusCode());
return Mono.error(new SomeWebClientException(STATE_ABBREVIATIONS_ERROR));
})
.bodyToMono(SomeData.class));
有时第 6 步挂起超过 11 分钟,而此服务没有任何问题。它不是反应性的,但响应时间约为 400 毫秒
另一件值得一提的事情是MainService
是一个繁重的IO操作,可能需要1分钟或更长时间。
我觉得 MainService
上有很多请求挂起,没有任何线程留给其他操作,这有意义吗?如果是这样,如何解决这样的问题?
有人可以提出这个问题的任何原因吗?我没主意了
在不了解完整应用程序的情况下无法确定,但阻塞 IO 操作确实是最有可能的罪魁祸首。
Schedulers.boundedElastic()
,顾名思义,是有界的。默认情况下,界限是“可用 CPU 核心数的十倍”,因此在 2 核机器上它将是 20。如果并发请求数超过限制,其余的将放入队列中等待无限期地获得免费线程。如果您需要比这更多的并发性,您应该考虑使用具有更高限制的 Scheduler.fromExecutor
来设置您自己的调度程序。