使用递增的查询参数重复 WebClient 调用
Repeat WebClient call with incremented query parameter
我正在尝试构建一种方法,该方法应在一个查询参数小于 45000 时向一个外部端点发出许多 HTTP 请求。
我需要这样做,因为外部端点允许我获取 100 个项目,但有超过 44000 个项目要获取。
private int offset = 0;
public Flux<List<Model>> getItems() {
return Flux.from(
webClientBuilder
.build()
.get()
.uri(uriBuilder -> uriBuilder
.path("/getItems")
.queryParam("limit", 100)
.queryParam("offset", getOffset())
.build())
.retrieve()
.bodyToMono(Model.class)
.doOnSuccess(System.out::println)
.flatMap(model -> {
setOffset(getOffset() + 100);
log.info("Offset: " + getOffset());
return repository.saveAll(model.getData().getResults()).collectList();
}).delayElement(Duration.ofSeconds(15)))
.repeat(() -> getOffset() <= 45000);
}
public int getOffset() {
return offset;
}
public void setOffset(int offset) {
this.offset = offset;
}
它似乎有效,因为记录偏移参数增加但 HTTP 请求的偏移量等于 0。方法returns 前 100 个项目而不是 44566 个项目
问题实际上是 webclient
是在订阅之前急切构建的,并且 "cached" 具有初始 offset
值。每次调用后,Flux
被重新订阅,但准备好的带有偏移量的 Web 服务调用仍然是 "cached"。
您必须以惰性方式提供 weblient
(例如通过将其包装在 lambda 中),这会强制其所有参数在每次调用时重新计算。有一个特殊的运算符 - defer()
.
解决方案:
Mono<Model> response = Mono.defer(() -> webClientBuilder
.build()
.get()
.uri(uriBuilder -> uriBuilder
.path("/getItems")
.queryParam("limit", 100)
.queryParam("offset", getOffset())
.build())
.retrieve()
.bodyToMono(Model.class)
);
Flux.from(response
.doOnEach(System.out::println)
.flatMap(model -> {
setOffset(getOffset() + 100);
log.info("Offset: " + getOffset());
return repository.saveAll(model.getData().getResults()).collectList();
}).delayElement(Duration.ofSeconds(15))
).repeat(() -> getOffset() <= 45000).subscribe();
另一个问题证明了急切执行的相同问题:
我正在尝试构建一种方法,该方法应在一个查询参数小于 45000 时向一个外部端点发出许多 HTTP 请求。
我需要这样做,因为外部端点允许我获取 100 个项目,但有超过 44000 个项目要获取。
private int offset = 0;
public Flux<List<Model>> getItems() {
return Flux.from(
webClientBuilder
.build()
.get()
.uri(uriBuilder -> uriBuilder
.path("/getItems")
.queryParam("limit", 100)
.queryParam("offset", getOffset())
.build())
.retrieve()
.bodyToMono(Model.class)
.doOnSuccess(System.out::println)
.flatMap(model -> {
setOffset(getOffset() + 100);
log.info("Offset: " + getOffset());
return repository.saveAll(model.getData().getResults()).collectList();
}).delayElement(Duration.ofSeconds(15)))
.repeat(() -> getOffset() <= 45000);
}
public int getOffset() {
return offset;
}
public void setOffset(int offset) {
this.offset = offset;
}
它似乎有效,因为记录偏移参数增加但 HTTP 请求的偏移量等于 0。方法returns 前 100 个项目而不是 44566 个项目
问题实际上是 webclient
是在订阅之前急切构建的,并且 "cached" 具有初始 offset
值。每次调用后,Flux
被重新订阅,但准备好的带有偏移量的 Web 服务调用仍然是 "cached"。
您必须以惰性方式提供 weblient
(例如通过将其包装在 lambda 中),这会强制其所有参数在每次调用时重新计算。有一个特殊的运算符 - defer()
.
解决方案:
Mono<Model> response = Mono.defer(() -> webClientBuilder
.build()
.get()
.uri(uriBuilder -> uriBuilder
.path("/getItems")
.queryParam("limit", 100)
.queryParam("offset", getOffset())
.build())
.retrieve()
.bodyToMono(Model.class)
);
Flux.from(response
.doOnEach(System.out::println)
.flatMap(model -> {
setOffset(getOffset() + 100);
log.info("Offset: " + getOffset());
return repository.saveAll(model.getData().getResults()).collectList();
}).delayElement(Duration.ofSeconds(15))
).repeat(() -> getOffset() <= 45000).subscribe();
另一个问题证明了急切执行的相同问题: