仅在需要时在 Reactor 的 Flux 中请求 next
Request next in Reactor's Flux only when needed
我有一个 API returns 实体列表,实体列表上限为 100 个。如果有更多实体,它 returns 下一页的标记。
我想创建一个通量 returns 所有实体(所有页面的),但仅在需要时(如果请求)。
我写了这段代码:
class Page {
String token;
List<Object> entities;
}
Flux<Object> load(String token, final Function<String, Mono<Page>> fct) {
return fct.apply(token).flatMapMany(page -> {
if (page.token == null) {
// no more pages
return Flux.fromIterable(page.entities);
}
return Flux.fromIterable(page.entities).concatWith(Flux.defer(() -> load(page.token, fct)));
});
}
而且有效 - 差不多
如果我请求 99 个元素,则加载第一页并且我的 flux 包含 99 个元素。
如果我请求 150 个元素,则加载第一页和第二页并且我的 flux 包含 150 个元素。
但是,如果我请求 100 个元素,则会加载第一页和第二页(并且我的 flux 包含这 100 个元素)。我的问题是第二页加载时我没有请求第 101 个元素。
当前行为:
subscribe()
=> Function is called to load page 1
request(10)
=> Received: 0-9
request(89)
=> Received: 10-98
request(1)
=> Received: 99
=> Function is called to load page 2
request(1)
=> Received: 100
预期是:第 2 页的加载发生在最后一个请求之后(1)
好像哪里有预取,但我没看到哪里。有什么想法吗?
好的,我找到了。每个人都没有预取。实际上是 Flux.defer
在订阅时加载下一页,而不是在请求时加载。
解决该问题的快速(但不完善)测试是:
Flux<Object> load(String token, final Function<String, Mono<Page>> fct) {
return fct.apply(token).flatMapMany(page -> {
if (page.token == null) {
// no more pages
return Flux.fromIterable(page.entities);
}
return Flux
.fromIterable(page.entities)
.concatWith(
// Flux.defer(() -> load(page.token, fct))
Flux.create(s -> {
DelegateSubscriber[] ref = new DelegateSubscriber[1];
s.onRequest(l -> {
if (ref[0] == null) {
ref[0] = new DelegateSubscriber(s);
load(page.token, fct).subscribe(ref[0]);
}
ref[0].request(l);
});
}));
});
}
static class DelegateSubscriber extends BaseSubscriber<Object> {
FluxSink<Object> delegate;
public DelegateSubscriber(final FluxSink<Object> delegate) {
this.delegate = delegate;
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
// nothing
}
@Override
protected void hookOnNext(Object value) {
delegate.next(value);
}
@Override
protected void hookOnError(Throwable throwable) {
delegate.error(throwable);
}
}
我有一个 API returns 实体列表,实体列表上限为 100 个。如果有更多实体,它 returns 下一页的标记。
我想创建一个通量 returns 所有实体(所有页面的),但仅在需要时(如果请求)。
我写了这段代码:
class Page {
String token;
List<Object> entities;
}
Flux<Object> load(String token, final Function<String, Mono<Page>> fct) {
return fct.apply(token).flatMapMany(page -> {
if (page.token == null) {
// no more pages
return Flux.fromIterable(page.entities);
}
return Flux.fromIterable(page.entities).concatWith(Flux.defer(() -> load(page.token, fct)));
});
}
而且有效 - 差不多
如果我请求 99 个元素,则加载第一页并且我的 flux 包含 99 个元素。
如果我请求 150 个元素,则加载第一页和第二页并且我的 flux 包含 150 个元素。
但是,如果我请求 100 个元素,则会加载第一页和第二页(并且我的 flux 包含这 100 个元素)。我的问题是第二页加载时我没有请求第 101 个元素。
当前行为:
subscribe()
=> Function is called to load page 1
request(10)
=> Received: 0-9
request(89)
=> Received: 10-98
request(1)
=> Received: 99
=> Function is called to load page 2
request(1)
=> Received: 100
预期是:第 2 页的加载发生在最后一个请求之后(1)
好像哪里有预取,但我没看到哪里。有什么想法吗?
好的,我找到了。每个人都没有预取。实际上是 Flux.defer
在订阅时加载下一页,而不是在请求时加载。
解决该问题的快速(但不完善)测试是:
Flux<Object> load(String token, final Function<String, Mono<Page>> fct) {
return fct.apply(token).flatMapMany(page -> {
if (page.token == null) {
// no more pages
return Flux.fromIterable(page.entities);
}
return Flux
.fromIterable(page.entities)
.concatWith(
// Flux.defer(() -> load(page.token, fct))
Flux.create(s -> {
DelegateSubscriber[] ref = new DelegateSubscriber[1];
s.onRequest(l -> {
if (ref[0] == null) {
ref[0] = new DelegateSubscriber(s);
load(page.token, fct).subscribe(ref[0]);
}
ref[0].request(l);
});
}));
});
}
static class DelegateSubscriber extends BaseSubscriber<Object> {
FluxSink<Object> delegate;
public DelegateSubscriber(final FluxSink<Object> delegate) {
this.delegate = delegate;
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
// nothing
}
@Override
protected void hookOnNext(Object value) {
delegate.next(value);
}
@Override
protected void hookOnError(Throwable throwable) {
delegate.error(throwable);
}
}