RxScala 订阅多观察者只向第一个观察者发送事件
RxScala subscribe with multi Observer just emit event to first one
我尝试使用 multi Observer
订阅 Observable
其中 onNext
发生在 loop.It 似乎对每个观察者都不起作用。
import rx.lang.scala.Observable
object SubscribeMultiEvent extends App{
val obv = Observable.apply[String]{ s =>
def printForever: Unit = {
s.onNext("hi~")
Thread.sleep(1000)
printForever
}
printForever
}
obv.subscribe(s => println(s"first observer - $s"))
obv.subscribe(s => println(s"second observer - $s"))
Thread.currentThread().join()
}
只响应第一个观察者
first observer - hi~
first observer - hi~
...
为什么第二个收不到订阅?谢谢
您代码中的问题是您的 Observable
是同步的。这意味着第二个 subscribe
不会 运行 直到第一个 subscribe
完成。由于您的 Observable
从未完成,因此第二个 subscribe
将无法 运行.
要解决此问题,您需要使 Observable
异步。您可以在另一个线程中使用 subscribeOn
到 运行。例如,
import rx.lang.scala.Observable
import rx.lang.scala.schedulers.NewThreadScheduler
object SubscribeMultiEvent extends App{
val obv = Observable.apply[String]{ s =>
def printForever: Unit = {
s.onNext("hi~")
Thread.sleep(1000)
printForever
}
printForever
}.subscribeOn(NewThreadScheduler())
obv.subscribe(s => println(s"first observer - $s"))
obv.subscribe(s => println(s"second observer - $s"))
Thread.sleep(60000)
}
Thread.sleep(60000)
最后很重要。 RxJava的Threads默认是daemon,如果主线程结束了,因为没有非daemon线程了,JVM就会退出。为了防止主线程停止,您需要添加类似 Thread.sleep(60000)
.
的内容
我尝试使用 multi Observer
订阅 Observable
其中 onNext
发生在 loop.It 似乎对每个观察者都不起作用。
import rx.lang.scala.Observable
object SubscribeMultiEvent extends App{
val obv = Observable.apply[String]{ s =>
def printForever: Unit = {
s.onNext("hi~")
Thread.sleep(1000)
printForever
}
printForever
}
obv.subscribe(s => println(s"first observer - $s"))
obv.subscribe(s => println(s"second observer - $s"))
Thread.currentThread().join()
}
只响应第一个观察者
first observer - hi~
first observer - hi~
...
为什么第二个收不到订阅?谢谢
您代码中的问题是您的 Observable
是同步的。这意味着第二个 subscribe
不会 运行 直到第一个 subscribe
完成。由于您的 Observable
从未完成,因此第二个 subscribe
将无法 运行.
要解决此问题,您需要使 Observable
异步。您可以在另一个线程中使用 subscribeOn
到 运行。例如,
import rx.lang.scala.Observable
import rx.lang.scala.schedulers.NewThreadScheduler
object SubscribeMultiEvent extends App{
val obv = Observable.apply[String]{ s =>
def printForever: Unit = {
s.onNext("hi~")
Thread.sleep(1000)
printForever
}
printForever
}.subscribeOn(NewThreadScheduler())
obv.subscribe(s => println(s"first observer - $s"))
obv.subscribe(s => println(s"second observer - $s"))
Thread.sleep(60000)
}
Thread.sleep(60000)
最后很重要。 RxJava的Threads默认是daemon,如果主线程结束了,因为没有非daemon线程了,JVM就会退出。为了防止主线程停止,您需要添加类似 Thread.sleep(60000)
.