使用递增的查询参数重复 WebClient 调用

Repeat WebClient call with incremented query parameter

我正在尝试构建一种方法,该方法应在一个查询参数小于 45000 时向一个外部端点发出许多 HTTP 请求。

我需要这样做,因为外部端点允许我获取 100 个项目,但有超过 44000 个项目要获取。

private int offset = 0;

public Flux<List<Model>> getItems() {
    return Flux.from(
            webClientBuilder
                    .build()
                    .get()
                    .uri(uriBuilder -> uriBuilder
                            .path("/getItems")
                            .queryParam("limit", 100)
                            .queryParam("offset", getOffset())
                            .build())
                    .retrieve()
                    .bodyToMono(Model.class)
                    .doOnSuccess(System.out::println)
                    .flatMap(model -> {
                        setOffset(getOffset() + 100);
                        log.info("Offset: " + getOffset());
                        return repository.saveAll(model.getData().getResults()).collectList();
                    }).delayElement(Duration.ofSeconds(15)))
                    .repeat(() -> getOffset() <= 45000);
}

public int getOffset() {
    return offset;
}

public void setOffset(int offset) {
    this.offset = offset;
}

它似乎有效,因为记录偏移参数增加但 HTTP 请求的偏移量等于 0。方法returns 前 100 个项目而不是 44566 个项目

问题实际上是 webclient 是在订阅之前急切构建的,并且 "cached" 具有初始 offset 值。每次调用后,Flux 被重新订阅,但准备好的带有偏移量的 Web 服务调用仍然是 "cached"。 您必须以惰性方式提供 weblient(例如通过将其包装在 lambda 中),这会强制其所有参数在每次调用时重新计算。有一个特殊的运算符 - defer().

解决方案:

Mono<Model> response = Mono.defer(() -> webClientBuilder
        .build()
        .get()
        .uri(uriBuilder -> uriBuilder
                .path("/getItems")
                .queryParam("limit", 100)
                .queryParam("offset", getOffset())
                .build())
        .retrieve()
        .bodyToMono(Model.class)
);


Flux.from(response
        .doOnEach(System.out::println)
        .flatMap(model -> {
            setOffset(getOffset() + 100);
            log.info("Offset: " + getOffset());
            return repository.saveAll(model.getData().getResults()).collectList();
        }).delayElement(Duration.ofSeconds(15))
).repeat(() -> getOffset() <= 45000).subscribe();

另一个问题证明了急切执行的相同问题: