在反应式编程中,doOnNext 调用的顺序是否有保证?

In reactive programming, is the sequence of doOnNext calls guaranteed?

响应式编程新手。我的 Flux 中有一系列调用,我需要确保它们按顺序完成。例如

Flux<Thing> flux = ...
.doOnNext(this::sendThing)
.doOnNext(this::persistThing)
.doOnError(error -> log.error("", error))
.blockLast();

我需要确保 sendThingpersistThing 之前完成。我不清楚,作为一个被动的新手,如果有保证的话。

不熟悉这种反应式实现,但看起来您正在向单个可观察对象添加两个独立的订阅者,这不能保证执行顺序。

如果 send 和 persist 是阻塞(同步)函数,您可以创建一些 'sendAndSync' 函数。

否则,您需要将 'send' 设置为自身并可观察,并且 'persist' 应该是其订阅者。您可以通过 'send' 在发送完成后写入 PublishSubject 并 'persist' 成为该 PublishSubject 的订阅者(通过 doOnNext)来实现。

根据documentation

The Consumer is executed first, then the onNext signal is propagated downstream.

因此,this::sendThing消费者将在onNext信号之前完成 传播到 this::persistThing.

示例:

    Flux.range(1, 3)
        .doOnNext(n -> log.info("doOnNext1"))
        .log()
        .doOnNext(n -> log.info("doOnNext2"))
        .blockLast();

上面的代码片段打印了以下内容:

  • doOnNext1
  • | onNext(1)
  • doOnNext2
  • doOnNext1
  • | onNext(2)
  • doOnNext2
  • doOnNext1
  • | onNext(3)
  • doOnNext2

onNext 信号总是在打印 "doOnNext2" 之前传播。

请记住,doOn 方法应仅用于副作用操作(例如日志)。