RxScala 订阅多观察者只向第一个观察者发送事件

RxScala subscribe with multi Observer just emit event to first one

我尝试使用 multi Observer 订阅 Observable 其中 onNext 发生在 loop.It 似乎对每个观察者都不起作用。

import rx.lang.scala.Observable

object SubscribeMultiEvent extends App{
  val obv = Observable.apply[String]{ s =>
    def printForever: Unit = {
      s.onNext("hi~")
      Thread.sleep(1000)
      printForever
    }
    printForever
  }

  obv.subscribe(s => println(s"first observer - $s"))
  obv.subscribe(s => println(s"second observer - $s"))

  Thread.currentThread().join()
}

只响应第一个观察者

first observer - hi~
first observer - hi~
...

为什么第二个收不到订阅?谢谢

您代码中的问题是您的 Observable 是同步的。这意味着第二个 subscribe 不会 运行 直到第一个 subscribe 完成。由于您的 Observable 从未完成,因此第二个 subscribe 将无法 运行.

要解决此问题,您需要使 Observable 异步。您可以在另一个线程中使用 subscribeOn 到 运行。例如,

import rx.lang.scala.Observable
import rx.lang.scala.schedulers.NewThreadScheduler

object SubscribeMultiEvent extends App{
  val obv = Observable.apply[String]{ s =>
    def printForever: Unit = {
      s.onNext("hi~")
      Thread.sleep(1000)
      printForever
    }
    printForever
  }.subscribeOn(NewThreadScheduler())

  obv.subscribe(s => println(s"first observer - $s"))
  obv.subscribe(s => println(s"second observer - $s"))

  Thread.sleep(60000)
}

Thread.sleep(60000) 最后很重要。 RxJava的Threads默认是daemon,如果主线程结束了,因为没有非daemon线程了,JVM就会退出。为了防止主线程停止,您需要添加类似 Thread.sleep(60000).

的内容