Reactor:为什么我的代码适用于 publishOn,但不适用于 subscribeOn

Reactor: why my code works with publishOn, but not with subscribeOn

我正在学习调度程序的工作原理。所以我尝试将数字设置为 100。使用 publishOn 它可以工作(它将数字设置为 100),但是使用 subscribeOn 它不会将数字设置为 100。我不明白为什么?

@Test
fun reactor01_LearnSchedulers(){
  var number = 0
  Mono.just(100)
    .doOnNext { numb -> number = numb }
    // .subscribeOn(Schedulers.boundedElastic()) this gives result: 0
    .publishOn(Schedulers.boundedElastic()) // this gives result: 100
    .subscribe()

  println("Result : $number")
}

更新: 添加一些代码后,我发现结果不一致。

fun reactor01_LearnSchedulers(){
  var number = 0
  Mono.just(100)
    .doOnNext { numb -> number = numb }
    .doOnNext { println("Result from inside: $number") }
    .publishOn(Schedulers.boundedElastic())
    .subscribe()

  println("Result from outside: $number")
  // with publishOn Result from inside: 100, Result from outside: 100
  // with subscribeOn Result from outside: 0, Result from inside: 100
}

这里的问题是计时和理解线程转移在 Reactor 中的工作原理。

当您执行 subscribe() 时,您将在具有该反应流定义的同一线程上或在您在 subscribeOn().

中指定的线程上执行此操作

publishOn()在这里是独立的,不会影响订阅者的行为。

因此,当您有 publishOn() 但没有 subscribeOn() 时,您仍然可以使流程在与整个测试方法相同的线程中执行。 println()subscribe() 和整个流评估之后立即执行。但是,当您添加 subscribeOn() 时,流程的订阅和执行发生在该线程上,从而使主线程可以自由地继续执行代码中的下一条语句。因此,由于您处于异步情况,因此将 number 设置为预期值并不幸运。

我建议您学习 StepVerifier 以了解如何测试所有这些 Reactor 流:https://projectreactor.io/docs/core/release/reference/#testing