在 Flux 中生成分页 WebClient 请求和消费响应

Generating paged WebClient requests and consuming response in a Flux

我向第三方 Web 服务发出重复的分页 WebClient 请求。我现在的实现可以工作但是阻塞

到目前为止我的实施:

var elementsPerPage = 10;
Flux
    .generate(
        () -> 0,
        (pageIndex, emitter) -> {
            BlahServiceResponse blahServiceResponse =
                webClient
                    .get()
                    .uri("/blah?pageIndex={pageIndex}", pageIndex)
                    .retrieve()
                    .bodyToMono(BlahServiceResponse.class)
                    .block(); // Yuck!!!
            if (blahServiceResponse.getStudents().size() > 0) {
                emitter.next(blahServiceResponse);
            } else {
                emitter.complete();
            }
            return pageIndex + elementsPerPage;
        }
    )
    .subscribe(System.out::println); // Replace me with actual logic

出于可以理解的原因,如果将上面的代码更改为以下内容,则会抛出“IllegalStateException:生成器未调用任何 SynchronousSink 方法”异常:

webClient
    .get()
    ...
    .bodyToMono(BlahServiceResponse.class)
    .subscribe(emitter::next);

所以我开始寻找异步 Sink 和 realized it was Flux|MonoSink。但据我所知,Flux 中没有支持使用 Flux|MonoSink 生成有状态元素的构建器方法。

我是不是遗漏了什么,是否有更优雅的方法?

静态分页

如果您事先知道页面索引并且您有生成它的规则。

var elementsPerPage = 10;

Flux.generate(
        () -> 0,
        (pageIndex, emitter) -> {
            if (pageIndex < 30) {
                emitter.next(pageIndex);
            } else {
                emitter.complete();
            }
            return pageIndex + elementsPerPage;
        })
        .flatMap(pageIndex -> webClient
                .get()
                .uri("/blah?pageIndex={pageIndex}", pageIndex)
                .retrieve()
                .bodyToMono(BlahServiceResponse.class))
        .subscribe(System.out::println);

动态分页

如果下一页索引依赖于上次查询的页面。

public static void main(String[] args) {
    var elementsPerPage = 10;

    callWithPageIndex(0)
            .expand(pagedResponse -> {
                if (pagedResponse.getResponse().isEmpty()) {
                    return Mono.empty();
                } else {
                    return callWithPageIndex(pagedResponse.getPageIndex() + elementsPerPage);
                }
            })
            .subscribe(System.out::println);
}

private static Mono<PagedResponse<BlahServiceResponse>> callWithPageIndex(Integer pageIndex) {
    return webClient
            .get()
            .uri("/blah?pageIndex={pageIndex}", pageIndex)
            .retrieve()
            .bodyToMono(BlahServiceResponse.class)
            .map(response -> new PagedResponse<>(pageIndex, response));
}

@lombok.Value
static class PagedResponse<T> {
    int pageIndex;
    T response;
}