在反应式编程中,doOnNext 调用的顺序是否有保证?
In reactive programming, is the sequence of doOnNext calls guaranteed?
响应式编程新手。我的 Flux 中有一系列调用,我需要确保它们按顺序完成。例如
Flux<Thing> flux = ...
.doOnNext(this::sendThing)
.doOnNext(this::persistThing)
.doOnError(error -> log.error("", error))
.blockLast();
我需要确保 sendThing
在 persistThing
之前完成。我不清楚,作为一个被动的新手,如果有保证的话。
不熟悉这种反应式实现,但看起来您正在向单个可观察对象添加两个独立的订阅者,这不能保证执行顺序。
如果 send 和 persist 是阻塞(同步)函数,您可以创建一些 'sendAndSync' 函数。
否则,您需要将 'send' 设置为自身并可观察,并且 'persist' 应该是其订阅者。您可以通过 'send' 在发送完成后写入 PublishSubject 并 'persist' 成为该 PublishSubject 的订阅者(通过 doOnNext)来实现。
The Consumer is executed first, then the onNext signal is propagated
downstream.
因此,this::sendThing
消费者将在onNext
信号之前完成
传播到 this::persistThing
.
示例:
Flux.range(1, 3)
.doOnNext(n -> log.info("doOnNext1"))
.log()
.doOnNext(n -> log.info("doOnNext2"))
.blockLast();
上面的代码片段打印了以下内容:
- doOnNext1
- | onNext(1)
- doOnNext2
- doOnNext1
- | onNext(2)
- doOnNext2
- doOnNext1
- | onNext(3)
- doOnNext2
onNext
信号总是在打印 "doOnNext2"
之前传播。
请记住,doOn 方法应仅用于副作用操作(例如日志)。
响应式编程新手。我的 Flux 中有一系列调用,我需要确保它们按顺序完成。例如
Flux<Thing> flux = ...
.doOnNext(this::sendThing)
.doOnNext(this::persistThing)
.doOnError(error -> log.error("", error))
.blockLast();
我需要确保 sendThing
在 persistThing
之前完成。我不清楚,作为一个被动的新手,如果有保证的话。
不熟悉这种反应式实现,但看起来您正在向单个可观察对象添加两个独立的订阅者,这不能保证执行顺序。
如果 send 和 persist 是阻塞(同步)函数,您可以创建一些 'sendAndSync' 函数。
否则,您需要将 'send' 设置为自身并可观察,并且 'persist' 应该是其订阅者。您可以通过 'send' 在发送完成后写入 PublishSubject 并 'persist' 成为该 PublishSubject 的订阅者(通过 doOnNext)来实现。
The Consumer is executed first, then the onNext signal is propagated downstream.
因此,this::sendThing
消费者将在onNext
信号之前完成
传播到 this::persistThing
.
示例:
Flux.range(1, 3)
.doOnNext(n -> log.info("doOnNext1"))
.log()
.doOnNext(n -> log.info("doOnNext2"))
.blockLast();
上面的代码片段打印了以下内容:
- doOnNext1
- | onNext(1)
- doOnNext2
- doOnNext1
- | onNext(2)
- doOnNext2
- doOnNext1
- | onNext(3)
- doOnNext2
onNext
信号总是在打印 "doOnNext2"
之前传播。
请记住,doOn 方法应仅用于副作用操作(例如日志)。