仅在需要时在 Reactor 的 Flux 中请求 next

Request next in Reactor's Flux only when needed

我有一个 API returns 实体列表,实体列表上限为 100 个。如果有更多实体,它 returns 下一页的标记。

我想创建一个通量 returns 所有实体(所有页面的),但仅在需要时(如果请求)。

我写了这段代码:

class Page {
    String token;
    List<Object> entities;
}

Flux<Object> load(String token, final Function<String, Mono<Page>> fct) {
    return fct.apply(token).flatMapMany(page -> {
        if (page.token == null) {
            // no more pages
            return Flux.fromIterable(page.entities);
        }

        return Flux.fromIterable(page.entities).concatWith(Flux.defer(() -> load(page.token, fct)));
    });
}

而且有效 - 差不多

如果我请求 99 个元素,则加载第一页并且我的 flux 包含 99 个元素。

如果我请求 150 个元素,则加载第一页和第二页并且我的 flux 包含 150 个元素。

但是,如果我请求 100 个元素,则会加载第一页和第二页(并且我的 flux 包含这 100 个元素)。我的问题是第二页加载时我没有请求第 101 个元素。

当前行为:

subscribe()
=> Function is called to load page 1
request(10)
=> Received: 0-9
request(89)
=> Received: 10-98
request(1)
=> Received: 99
=> Function is called to load page 2
request(1)
=> Received: 100

预期是:第 2 页的加载发生在最后一个请求之后(1)

好像哪里有预取,但我没看到哪里。有什么想法吗?

好的,我找到了。每个人都没有预取。实际上是 Flux.defer 在订阅时加载下一页,而不是在请求时加载。

解决该问题的快速(但不完善)测试是:

Flux<Object> load(String token, final Function<String, Mono<Page>> fct) {
    return fct.apply(token).flatMapMany(page -> {
        if (page.token == null) {
            // no more pages
            return Flux.fromIterable(page.entities);
        }

        return Flux
                .fromIterable(page.entities)
                .concatWith(
                        // Flux.defer(() -> load(page.token, fct))
                        Flux.create(s -> {
                            DelegateSubscriber[] ref = new DelegateSubscriber[1];

                            s.onRequest(l -> {
                                if (ref[0] == null) {
                                    ref[0] = new DelegateSubscriber(s);
                                    load(page.token, fct).subscribe(ref[0]);
                                }
                                ref[0].request(l);
                            });
                        }));
    });
}

static class DelegateSubscriber extends BaseSubscriber<Object> {

    FluxSink<Object> delegate;

    public DelegateSubscriber(final FluxSink<Object> delegate) {
        this.delegate = delegate;
    }

    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        // nothing
    }

    @Override
    protected void hookOnNext(Object value) {
        delegate.next(value);
    }

    @Override
    protected void hookOnError(Throwable throwable) {
        delegate.error(throwable);
    }
}