弹簧靴。反应式网络客户端。响应前连接过早关闭
Springboot. Reactive webclient. Connection prematurely closed BEFORE response
我遇到过这个问题
Pooled connection observed an error
reactor.netty.http.client.HttpClientOperations$PrematureCloseException:
Connection prematurely closed BEFORE response".
我正在通过响应式 Web 客户端在请求的时间范围内从 Graphite 服务器收集指标(为了减少通过 http 传输的数据量,我将天分为 24/4 块),然后将响应组合成一个矩阵并将其保存到 csv 文件 -> 合并到另一个矩阵。
当天数增加时会出现问题(2 或 3 可以正常工作,但天数越多,连接关闭时发生的错误就越多)。尝试使用延迟,它有一点帮助,但要再处理一天而不会出错。
堆栈跟踪:
ClosedConnectionStacktrace
发现有点类似的问题https://github.com/reactor/reactor-netty/issues/413,但不确定。
这是代码片段:
discoveryMono.thenReturn(true) // discover metrics
.flux()
.flatMap(m -> Flux.fromIterable(dates) // process all days
.delayElements(Duration.ofSeconds(1L))
.flatMap(date -> Flux.range(0, 24 / intervalHours) // divide day into chunks
.delayElements(Duration.of(100L, ChronoUnit.MILLIS))
.flatMap(timeFraction -> Flux.fromIterable(sequentialTasks) // task to invoke webclient
.flatMap(task -> {
Instant from = date.plus(timeFraction * intervalHours, ChronoUnit.HOURS);
Instant until = from.plus(intervalHours, ChronoUnit.HOURS);
TaskParams taskParams = new TaskParams(itSystem, from, until, TaskParams.PollingType.FULLDAY);
log.trace("workflow | from={}, until={}", from, until);
return task.apply(taskParams)
// .doOnNext(m -> log.trace("Matrix: {}", m))
.onErrorResume(err -> {
log.error("processFullDaysInChunks | Error: {}", err);
return Mono.empty();
});
}).flatMap(params -> Flux.fromIterable(fileTasks) // tasks to check/merge files, doesn't matter
.flatMap(fileTask -> parTask.apply(params)
.onErrorResume(err -> {
log.error("processFullDaysInChunks | Error: {}", err);
return Mono.empty();
})
)
)
)
)
).subscribeOn(fullDayScheduler).subscribe();
和 webclient 调用的部分任务:
private Flux<GraphiteResultDTO> getGraphiteResults(ITSystem itSystem, Instant from, Instant until) {
String fromStr = FROM_PARAMETER + Long.valueOf(from.getEpochSecond()).toString();
String untilStr = UNTIL_PARAMETER + Long.valueOf(until.getEpochSecond()).toString();
String uri = RENDER_URI + TARGET_PARAMETER + "{targetParam}" + fromStr + untilStr + FORMAT_JSON_PARAMETER;
WebClient webClient = getGraphiteWebClient(itSystem.getDataSource());
Set<String> targetParams = storage.getValueByITSystemId(itSystem.getId()).getSecond();
Flux<GraphiteResultDTO> result = Flux.fromIterable(targetParams)
.delayElements(Duration.of(10, ChronoUnit.MILLIS))
.flatMap(targetParam -> {
Map<String, String> params = Map.ofEntries(entry("targetParam", targetParam));
if (log.isTraceEnabled()) {
log.trace("getGraphiteResults | Uri={}, TargetPatam: {}", uri, targetParam);
}
return webClient.get()
.uri(uri, params)
.retrieve()
.onStatus(HttpStatus::isError, clientResponse -> {
log.trace("clientResponse | transforming body");
clientResponse.bodyToMono(String.class)
.doOnNext(errorString -> log.error("retrieve(), error={}", errorString));
// .flatMap(s -> Flux.error(clientResponse.bodyToFlux(WebClientException.class)));
return Mono.empty();
})
.bodyToFlux(GraphiteResultDTO.class)
.onErrorResume(throwable -> {
log.error("webclient | bodyToFlux error={}", throwable.getMessage());
return Flux.empty();
});
});
return result;
}
通过将 flatMap 运算符替换为带预取 1 的 concatMap 并限制速率(limitRate 运算符)解决了我的问题。所有请求现在都按顺序一一处理。所以现在没有必要使用时间延迟。
我遇到过这个问题
Pooled connection observed an error reactor.netty.http.client.HttpClientOperations$PrematureCloseException: Connection prematurely closed BEFORE response".
我正在通过响应式 Web 客户端在请求的时间范围内从 Graphite 服务器收集指标(为了减少通过 http 传输的数据量,我将天分为 24/4 块),然后将响应组合成一个矩阵并将其保存到 csv 文件 -> 合并到另一个矩阵。 当天数增加时会出现问题(2 或 3 可以正常工作,但天数越多,连接关闭时发生的错误就越多)。尝试使用延迟,它有一点帮助,但要再处理一天而不会出错。
堆栈跟踪: ClosedConnectionStacktrace
发现有点类似的问题https://github.com/reactor/reactor-netty/issues/413,但不确定。
这是代码片段:
discoveryMono.thenReturn(true) // discover metrics
.flux()
.flatMap(m -> Flux.fromIterable(dates) // process all days
.delayElements(Duration.ofSeconds(1L))
.flatMap(date -> Flux.range(0, 24 / intervalHours) // divide day into chunks
.delayElements(Duration.of(100L, ChronoUnit.MILLIS))
.flatMap(timeFraction -> Flux.fromIterable(sequentialTasks) // task to invoke webclient
.flatMap(task -> {
Instant from = date.plus(timeFraction * intervalHours, ChronoUnit.HOURS);
Instant until = from.plus(intervalHours, ChronoUnit.HOURS);
TaskParams taskParams = new TaskParams(itSystem, from, until, TaskParams.PollingType.FULLDAY);
log.trace("workflow | from={}, until={}", from, until);
return task.apply(taskParams)
// .doOnNext(m -> log.trace("Matrix: {}", m))
.onErrorResume(err -> {
log.error("processFullDaysInChunks | Error: {}", err);
return Mono.empty();
});
}).flatMap(params -> Flux.fromIterable(fileTasks) // tasks to check/merge files, doesn't matter
.flatMap(fileTask -> parTask.apply(params)
.onErrorResume(err -> {
log.error("processFullDaysInChunks | Error: {}", err);
return Mono.empty();
})
)
)
)
)
).subscribeOn(fullDayScheduler).subscribe();
和 webclient 调用的部分任务:
private Flux<GraphiteResultDTO> getGraphiteResults(ITSystem itSystem, Instant from, Instant until) {
String fromStr = FROM_PARAMETER + Long.valueOf(from.getEpochSecond()).toString();
String untilStr = UNTIL_PARAMETER + Long.valueOf(until.getEpochSecond()).toString();
String uri = RENDER_URI + TARGET_PARAMETER + "{targetParam}" + fromStr + untilStr + FORMAT_JSON_PARAMETER;
WebClient webClient = getGraphiteWebClient(itSystem.getDataSource());
Set<String> targetParams = storage.getValueByITSystemId(itSystem.getId()).getSecond();
Flux<GraphiteResultDTO> result = Flux.fromIterable(targetParams)
.delayElements(Duration.of(10, ChronoUnit.MILLIS))
.flatMap(targetParam -> {
Map<String, String> params = Map.ofEntries(entry("targetParam", targetParam));
if (log.isTraceEnabled()) {
log.trace("getGraphiteResults | Uri={}, TargetPatam: {}", uri, targetParam);
}
return webClient.get()
.uri(uri, params)
.retrieve()
.onStatus(HttpStatus::isError, clientResponse -> {
log.trace("clientResponse | transforming body");
clientResponse.bodyToMono(String.class)
.doOnNext(errorString -> log.error("retrieve(), error={}", errorString));
// .flatMap(s -> Flux.error(clientResponse.bodyToFlux(WebClientException.class)));
return Mono.empty();
})
.bodyToFlux(GraphiteResultDTO.class)
.onErrorResume(throwable -> {
log.error("webclient | bodyToFlux error={}", throwable.getMessage());
return Flux.empty();
});
});
return result;
}
通过将 flatMap 运算符替换为带预取 1 的 concatMap 并限制速率(limitRate 运算符)解决了我的问题。所有请求现在都按顺序一一处理。所以现在没有必要使用时间延迟。