在 Spring WebFlux 中使用 .repeatWhen() 根据数据库资源重复订阅
Repeatedly subscribe based on DB resource with .repeatWhen() in Spring WebFlux
我想根据数据库中某些资源的状态重复执行 Flux process()。例如,如果资源中的元素数组不为空,则重复 process()。看起来 operator repeatWhen 适合我的目的 - 允许订阅具有资源的发布者。这是一个代码片段:
private Consumer<Signal<String>> processOnNewThread() {
return signal -> {
final var resourceId = signal.get();
if (resourceId == null) return;
this.process(resourceId)
.repeatWhen(repeat -> Mono.defer(() -> repo.findById(resourceId)
// filter to end repeat
.filter(r -> !r.getElems().isEmpty())
// return Mono with complete signal to repeat
.map(r -> r.getElems().size())))
.collectList()
.contextWrite(stateSignal.getContextView())
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
};
}
private Flux<String> process(String resourceId) { ... }
这段代码有两个问题:
- repo.findById(resourceId) 在 process() 方法之前执行,尽管 Mono.defer()
- 当 elems 为空时,重复序列以空信号完成,这不会导致重复结束,但会导致整个过程结束
关于如何检查新资源然后继续或完成重复的任何想法?
我设法通过更改运算符 .repeatWhen 实现了预期的结果,如下所示:
.repeatWhen(repeat -> repeat.flatMap(r -> Mono.defer(() -> repo.findById(resourceId)
.map(r -> r.getElems().size())))
.handle((nextRepeat, sink) -> {
// if elem size > 0 - repeat process
if (nextRepeat > 0) sink.next(nextRepeat);
else sink.complete();
}))
对进一步的链使用Flux repeat允许操作员Mono.defer() will execute correctly and get a fresh resource at each repeat check. The handle()操作员直接执行延长或结束重新订阅。因此,这解决了我遇到的问题
我想根据数据库中某些资源的状态重复执行 Flux process()。例如,如果资源中的元素数组不为空,则重复 process()。看起来 operator repeatWhen 适合我的目的 - 允许订阅具有资源的发布者。这是一个代码片段:
private Consumer<Signal<String>> processOnNewThread() {
return signal -> {
final var resourceId = signal.get();
if (resourceId == null) return;
this.process(resourceId)
.repeatWhen(repeat -> Mono.defer(() -> repo.findById(resourceId)
// filter to end repeat
.filter(r -> !r.getElems().isEmpty())
// return Mono with complete signal to repeat
.map(r -> r.getElems().size())))
.collectList()
.contextWrite(stateSignal.getContextView())
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
};
}
private Flux<String> process(String resourceId) { ... }
这段代码有两个问题:
- repo.findById(resourceId) 在 process() 方法之前执行,尽管 Mono.defer()
- 当 elems 为空时,重复序列以空信号完成,这不会导致重复结束,但会导致整个过程结束
关于如何检查新资源然后继续或完成重复的任何想法?
我设法通过更改运算符 .repeatWhen 实现了预期的结果,如下所示:
.repeatWhen(repeat -> repeat.flatMap(r -> Mono.defer(() -> repo.findById(resourceId)
.map(r -> r.getElems().size())))
.handle((nextRepeat, sink) -> {
// if elem size > 0 - repeat process
if (nextRepeat > 0) sink.next(nextRepeat);
else sink.complete();
}))
对进一步的链使用Flux repeat允许操作员Mono.defer() will execute correctly and get a fresh resource at each repeat check. The handle()操作员直接执行延长或结束重新订阅。因此,这解决了我遇到的问题