客户端超时后继续工作/ return 超时前的当前进度
Keep doing a job after client timeouts / return current progress before timeout
我正在编写一个 Spring-Webflux 应用程序,它可以处理下载和持久化的数据。
一个用例是来自客户端的 REST 请求,用于触发将数据从外部依赖项下载到应用程序的数据库以及 return 总下载条目。
我将此下载处理为对 Flux
的订阅,并使用包含 json 的 Mono
进行响应,告诉用户下载了多少数据:
http :8080/rest/download from==2019-08-14T00:00:00Z
HTTP/1.1 200 OK
Content-Length: 15
Content-Type: application/json;charset=UTF-8
{
"count": 19000
}
使用的 Java 代码如下所示:
public Mono<Map<String, Object>> importDataBetween(Instant from, Instant to) {
return backendAdapter.importData(from, to)
.doOnComplete(this::cleanupDb)
.doOnNext(this::storeData)
.count()
.map(count -> Map.of("count", count)); // e.g. { 'count' : 123 }
}
现在的问题是,如果客户端超时,作业本身将被取消:
http :8080/rest/download from==2019-08-01T00:00:00Z
http: error: Request timed out (30s).
webflux 是否有任何标准功能来处理此类情况?
是否可以计算结果并在超时后计算结果,例如29sreturn那个带备注的号码"the download is still in progress"?
理想情况下,如果操作时间少于 30 秒,结果应该是 returned,否则当前进度或默认响应可能是 returned 但订阅下载-不应取消进程。
非常感谢!
有几种方法可以解决您的一些问题。
你可以使用Flux#bufferTimeout(int maxSize, Duration maxTime)
要缓冲多个项目,当项目数量达到最大值或达到以秒为单位的给定时间时结束发出。
有几种方法,我建议检查通量 api 超时和缓冲区部分以找到适合您的方法。
有两种可能性:如果您想在客户端估计超时之前提前关闭连接,您可以使用 Flux::take
和合理的 Duration。如果您想向响应中添加任意信息,请使用 Flux::buffer
并将自定义消息添加到已缓冲的对象中。
为了展示这一点,请考虑以下示例:
@Slf4j
@RestController
@RequestMapping("api")
public class DemoController {
@GetMapping("flux-take")
public Flux<Result> fluxTake() {
Flux<Result> resultFlux = createFlux();
resultFlux.share().subscribe(result -> log.info("result={}", result));
return resultFlux.take(Duration.ofMillis(1500));
}
@GetMapping("flux-buffer")
public Mono<BufferedResult> fluxBuffer() {
Flux<Result> resultFlux = createFlux();
resultFlux.share().subscribe(result -> log.info("result={}", result));
return resultFlux.buffer(Duration.ofMillis(1500))
.next()
.map(results -> new BufferedResult("Request timed out, returning buffered results", results));
}
private Flux<Result> createFlux() {
return Flux.just(1, 2, 3, 4, 5, 6)
.delayElements(Duration.ofMillis(300))
.timed()
.map(timedId -> new Result(timedId.get(), timedId.elapsed().toMillis()));
}
@Data
@AllArgsConstructor
public static class Result {
private final Integer id;
private final Long elapsedMillis;
}
@Data
@AllArgsConstructor
public static class BufferedResult {
private final String message;
private final List<Result> results;
}
}
从客户的角度来看,结果是这样的:
$ http :8080/api/flux-take --timeout=2
HTTP/1.1 200 OK
Content-Type: application/json
transfer-encoding: chunked
[
{
"elapsedMillis": 304,
"id": 1
},
{
"elapsedMillis": 300,
"id": 2
},
{
"elapsedMillis": 304,
"id": 3
},
{
"elapsedMillis": 302,
"id": 4
}
]
$ http :8080/api/flux-buffer --timeout=2
HTTP/1.1 200 OK
Content-Length: 187
Content-Type: application/json
{
"message": "Request timed out, returning buffered results",
"results": [
{
"elapsedMillis": 301,
"id": 1
},
{
"elapsedMillis": 301,
"id": 2
},
{
"elapsedMillis": 302,
"id": 3
},
{
"elapsedMillis": 304,
"id": 4
}
]
}
请注意客户端的 2 秒超时和应用程序内的 1500 毫秒超时。客户端永远不会达到 2 秒超时并且始终会收到正确的响应。
IntelliJ 将在 resultFlux.share().subscribe(...)
上显示提示 Calling 'subscribe' in non-blocking scope
,但我想不出更好的方法来完成此任务。
我正在编写一个 Spring-Webflux 应用程序,它可以处理下载和持久化的数据。
一个用例是来自客户端的 REST 请求,用于触发将数据从外部依赖项下载到应用程序的数据库以及 return 总下载条目。
我将此下载处理为对 Flux
的订阅,并使用包含 json 的 Mono
进行响应,告诉用户下载了多少数据:
http :8080/rest/download from==2019-08-14T00:00:00Z
HTTP/1.1 200 OK
Content-Length: 15
Content-Type: application/json;charset=UTF-8
{
"count": 19000
}
使用的 Java 代码如下所示:
public Mono<Map<String, Object>> importDataBetween(Instant from, Instant to) {
return backendAdapter.importData(from, to)
.doOnComplete(this::cleanupDb)
.doOnNext(this::storeData)
.count()
.map(count -> Map.of("count", count)); // e.g. { 'count' : 123 }
}
现在的问题是,如果客户端超时,作业本身将被取消:
http :8080/rest/download from==2019-08-01T00:00:00Z
http: error: Request timed out (30s).
webflux 是否有任何标准功能来处理此类情况?
是否可以计算结果并在超时后计算结果,例如29sreturn那个带备注的号码"the download is still in progress"?
理想情况下,如果操作时间少于 30 秒,结果应该是 returned,否则当前进度或默认响应可能是 returned 但订阅下载-不应取消进程。
非常感谢!
有几种方法可以解决您的一些问题。
你可以使用Flux#bufferTimeout(int maxSize, Duration maxTime)
要缓冲多个项目,当项目数量达到最大值或达到以秒为单位的给定时间时结束发出。
有几种方法,我建议检查通量 api 超时和缓冲区部分以找到适合您的方法。
有两种可能性:如果您想在客户端估计超时之前提前关闭连接,您可以使用 Flux::take
和合理的 Duration。如果您想向响应中添加任意信息,请使用 Flux::buffer
并将自定义消息添加到已缓冲的对象中。
为了展示这一点,请考虑以下示例:
@Slf4j
@RestController
@RequestMapping("api")
public class DemoController {
@GetMapping("flux-take")
public Flux<Result> fluxTake() {
Flux<Result> resultFlux = createFlux();
resultFlux.share().subscribe(result -> log.info("result={}", result));
return resultFlux.take(Duration.ofMillis(1500));
}
@GetMapping("flux-buffer")
public Mono<BufferedResult> fluxBuffer() {
Flux<Result> resultFlux = createFlux();
resultFlux.share().subscribe(result -> log.info("result={}", result));
return resultFlux.buffer(Duration.ofMillis(1500))
.next()
.map(results -> new BufferedResult("Request timed out, returning buffered results", results));
}
private Flux<Result> createFlux() {
return Flux.just(1, 2, 3, 4, 5, 6)
.delayElements(Duration.ofMillis(300))
.timed()
.map(timedId -> new Result(timedId.get(), timedId.elapsed().toMillis()));
}
@Data
@AllArgsConstructor
public static class Result {
private final Integer id;
private final Long elapsedMillis;
}
@Data
@AllArgsConstructor
public static class BufferedResult {
private final String message;
private final List<Result> results;
}
}
从客户的角度来看,结果是这样的:
$ http :8080/api/flux-take --timeout=2
HTTP/1.1 200 OK
Content-Type: application/json
transfer-encoding: chunked
[
{
"elapsedMillis": 304,
"id": 1
},
{
"elapsedMillis": 300,
"id": 2
},
{
"elapsedMillis": 304,
"id": 3
},
{
"elapsedMillis": 302,
"id": 4
}
]
$ http :8080/api/flux-buffer --timeout=2
HTTP/1.1 200 OK
Content-Length: 187
Content-Type: application/json
{
"message": "Request timed out, returning buffered results",
"results": [
{
"elapsedMillis": 301,
"id": 1
},
{
"elapsedMillis": 301,
"id": 2
},
{
"elapsedMillis": 302,
"id": 3
},
{
"elapsedMillis": 304,
"id": 4
}
]
}
请注意客户端的 2 秒超时和应用程序内的 1500 毫秒超时。客户端永远不会达到 2 秒超时并且始终会收到正确的响应。
IntelliJ 将在 resultFlux.share().subscribe(...)
上显示提示 Calling 'subscribe' in non-blocking scope
,但我想不出更好的方法来完成此任务。