在 Flux 中生成分页 WebClient 请求和消费响应
Generating paged WebClient requests and consuming response in a Flux
我向第三方 Web 服务发出重复的分页 WebClient 请求。我现在的实现可以工作但是阻塞。
到目前为止我的实施:
var elementsPerPage = 10;
Flux
.generate(
() -> 0,
(pageIndex, emitter) -> {
BlahServiceResponse blahServiceResponse =
webClient
.get()
.uri("/blah?pageIndex={pageIndex}", pageIndex)
.retrieve()
.bodyToMono(BlahServiceResponse.class)
.block(); // Yuck!!!
if (blahServiceResponse.getStudents().size() > 0) {
emitter.next(blahServiceResponse);
} else {
emitter.complete();
}
return pageIndex + elementsPerPage;
}
)
.subscribe(System.out::println); // Replace me with actual logic
出于可以理解的原因,如果将上面的代码更改为以下内容,则会抛出“IllegalStateException:生成器未调用任何 SynchronousSink 方法”异常:
webClient
.get()
...
.bodyToMono(BlahServiceResponse.class)
.subscribe(emitter::next);
所以我开始寻找异步 Sink 和 realized it was Flux|MonoSink。但据我所知,Flux 中没有支持使用 Flux|MonoSink 生成有状态元素的构建器方法。
我是不是遗漏了什么,是否有更优雅的方法?
静态分页
如果您事先知道页面索引并且您有生成它的规则。
var elementsPerPage = 10;
Flux.generate(
() -> 0,
(pageIndex, emitter) -> {
if (pageIndex < 30) {
emitter.next(pageIndex);
} else {
emitter.complete();
}
return pageIndex + elementsPerPage;
})
.flatMap(pageIndex -> webClient
.get()
.uri("/blah?pageIndex={pageIndex}", pageIndex)
.retrieve()
.bodyToMono(BlahServiceResponse.class))
.subscribe(System.out::println);
动态分页
如果下一页索引依赖于上次查询的页面。
public static void main(String[] args) {
var elementsPerPage = 10;
callWithPageIndex(0)
.expand(pagedResponse -> {
if (pagedResponse.getResponse().isEmpty()) {
return Mono.empty();
} else {
return callWithPageIndex(pagedResponse.getPageIndex() + elementsPerPage);
}
})
.subscribe(System.out::println);
}
private static Mono<PagedResponse<BlahServiceResponse>> callWithPageIndex(Integer pageIndex) {
return webClient
.get()
.uri("/blah?pageIndex={pageIndex}", pageIndex)
.retrieve()
.bodyToMono(BlahServiceResponse.class)
.map(response -> new PagedResponse<>(pageIndex, response));
}
@lombok.Value
static class PagedResponse<T> {
int pageIndex;
T response;
}
我向第三方 Web 服务发出重复的分页 WebClient 请求。我现在的实现可以工作但是阻塞。
到目前为止我的实施:
var elementsPerPage = 10;
Flux
.generate(
() -> 0,
(pageIndex, emitter) -> {
BlahServiceResponse blahServiceResponse =
webClient
.get()
.uri("/blah?pageIndex={pageIndex}", pageIndex)
.retrieve()
.bodyToMono(BlahServiceResponse.class)
.block(); // Yuck!!!
if (blahServiceResponse.getStudents().size() > 0) {
emitter.next(blahServiceResponse);
} else {
emitter.complete();
}
return pageIndex + elementsPerPage;
}
)
.subscribe(System.out::println); // Replace me with actual logic
出于可以理解的原因,如果将上面的代码更改为以下内容,则会抛出“IllegalStateException:生成器未调用任何 SynchronousSink 方法”异常:
webClient
.get()
...
.bodyToMono(BlahServiceResponse.class)
.subscribe(emitter::next);
所以我开始寻找异步 Sink 和 realized it was Flux|MonoSink。但据我所知,Flux 中没有支持使用 Flux|MonoSink 生成有状态元素的构建器方法。
我是不是遗漏了什么,是否有更优雅的方法?
静态分页
如果您事先知道页面索引并且您有生成它的规则。
var elementsPerPage = 10;
Flux.generate(
() -> 0,
(pageIndex, emitter) -> {
if (pageIndex < 30) {
emitter.next(pageIndex);
} else {
emitter.complete();
}
return pageIndex + elementsPerPage;
})
.flatMap(pageIndex -> webClient
.get()
.uri("/blah?pageIndex={pageIndex}", pageIndex)
.retrieve()
.bodyToMono(BlahServiceResponse.class))
.subscribe(System.out::println);
动态分页
如果下一页索引依赖于上次查询的页面。
public static void main(String[] args) {
var elementsPerPage = 10;
callWithPageIndex(0)
.expand(pagedResponse -> {
if (pagedResponse.getResponse().isEmpty()) {
return Mono.empty();
} else {
return callWithPageIndex(pagedResponse.getPageIndex() + elementsPerPage);
}
})
.subscribe(System.out::println);
}
private static Mono<PagedResponse<BlahServiceResponse>> callWithPageIndex(Integer pageIndex) {
return webClient
.get()
.uri("/blah?pageIndex={pageIndex}", pageIndex)
.retrieve()
.bodyToMono(BlahServiceResponse.class)
.map(response -> new PagedResponse<>(pageIndex, response));
}
@lombok.Value
static class PagedResponse<T> {
int pageIndex;
T response;
}