客户端超时后继续工作/ return 超时前的当前进度

Keep doing a job after client timeouts / return current progress before timeout

我正在编写一个 Spring-Webflux 应用程序,它可以处理下载和持久化的数据。

一个用例是来自客户端的 REST 请求,用于触发将数据从外部依赖项下载到应用程序的数据库以及 return 总下载条目。

我将此下载处理为对 Flux 的订阅,并使用包含 json 的 Mono 进行响应,告诉用户下载了多少数据:

http :8080/rest/download from==2019-08-14T00:00:00Z

HTTP/1.1 200 OK
Content-Length: 15
Content-Type: application/json;charset=UTF-8

{
    "count": 19000
}

使用的 Java 代码如下所示:

public Mono<Map<String, Object>> importDataBetween(Instant from, Instant to) {
    return backendAdapter.importData(from, to)
            .doOnComplete(this::cleanupDb)
            .doOnNext(this::storeData)
            .count()
            .map(count -> Map.of("count", count)); // e.g. { 'count' : 123 }
}

现在的问题是,如果客户端超时,作业本身将被取消:

http :8080/rest/download from==2019-08-01T00:00:00Z

http: error: Request timed out (30s).

webflux 是否有任何标准功能来处理此类情况?

是否可以计算结果并在超时后计算结果,例如29sreturn那个带备注的号码"the download is still in progress"?

理想情况下,如果操作时间少于 30 秒,结果应该是 returned,否则当前进度或默认响应可能是 returned 但订阅下载-不应取消进程。

非常感谢!

有几种方法可以解决您的一些问题。

你可以使用Flux#bufferTimeout(int maxSize, Duration maxTime)

要缓冲多个项目,当项目数量达到最大值或达到以秒为单位的给定时间时结束发出。

有几种方法,我建议检查通量 api 超时和缓冲区部分以找到适合您的方法。

Flux API

有两种可能性:如果您想在客户端估计超时之前提前关闭连接,您可以使用 Flux::take 和合理的 Duration。如果您想向响应中添加任意信息,请使用 Flux::buffer 并将自定义消息添加到已缓冲的对象中。

为了展示这一点,请考虑以下示例:

@Slf4j
@RestController
@RequestMapping("api")
public class DemoController {

    @GetMapping("flux-take")
    public Flux<Result> fluxTake() {
        Flux<Result> resultFlux = createFlux();
        resultFlux.share().subscribe(result -> log.info("result={}", result));
        return resultFlux.take(Duration.ofMillis(1500));
    }

    @GetMapping("flux-buffer")
    public Mono<BufferedResult> fluxBuffer() {
        Flux<Result> resultFlux = createFlux();
        resultFlux.share().subscribe(result -> log.info("result={}", result));
        return resultFlux.buffer(Duration.ofMillis(1500))
                .next()
                .map(results -> new BufferedResult("Request timed out, returning buffered results", results));
    }

    private Flux<Result> createFlux() {
        return Flux.just(1, 2, 3, 4, 5, 6)
                .delayElements(Duration.ofMillis(300))
                .timed()
                .map(timedId -> new Result(timedId.get(), timedId.elapsed().toMillis()));
    }

    @Data
    @AllArgsConstructor
    public static class Result {
        private final Integer id;
        private final Long elapsedMillis;
    }

    @Data
    @AllArgsConstructor
    public static class BufferedResult {
        private final String message;
        private final List<Result> results;
    }
}

从客户的角度来看,结果是这样的:

$ http :8080/api/flux-take --timeout=2
HTTP/1.1 200 OK
Content-Type: application/json
transfer-encoding: chunked

[
    {
        "elapsedMillis": 304,
        "id": 1
    },
    {
        "elapsedMillis": 300,
        "id": 2
    },
    {
        "elapsedMillis": 304,
        "id": 3
    },
    {
        "elapsedMillis": 302,
        "id": 4
    }
]

$ http :8080/api/flux-buffer --timeout=2
HTTP/1.1 200 OK
Content-Length: 187
Content-Type: application/json

{
    "message": "Request timed out, returning buffered results",
    "results": [
        {
            "elapsedMillis": 301,
            "id": 1
        },
        {
            "elapsedMillis": 301,
            "id": 2
        },
        {
            "elapsedMillis": 302,
            "id": 3
        },
        {
            "elapsedMillis": 304,
            "id": 4
        }
    ]
}

请注意客户端的 2 秒超时和应用程序内的 1500 毫秒超时。客户端永远不会达到 2 秒超时并且始终会收到正确的响应。

IntelliJ 将在 resultFlux.share().subscribe(...) 上显示提示 Calling 'subscribe' in non-blocking scope,但我想不出更好的方法来完成此任务。