带回放的 RxScala Observables
RxScala Observables with replay
我正在尝试理解 RxScala 中的 replay
。我创建了一个这样的可观察对象:
lazy val toyObservable : Observable[Int] = {
val coldObservable : Observable[Int] = intPerSecond
val hotObservable : ConnectableObservable[Int] = coldObservable.publish
val cachedObservable = hotObservable //.replay(3) //<<<<<<<<< ODD THING
cachedObservable.connect
cachedObservable
}
其中 intPerSecond
从 0 开始每秒输出一个整数。第一个订阅的观察者确实每秒看到一个整数。如果第二个观察者在 t=6 秒时加入,那么从那时起他们都会以一秒的间隔看到匹配的流 6...7...8...9...。符合预期。
现在如果我添加 .replay(3)
我希望当第二个观察者加入时,他会看到 3456...7...8...9,即他会立即从缓存中获取 3 个整数,然后在生成它们时以每秒一个的速度接收它们。但是相反,现在两个观察者都看不到任何东西。我的语法有误吗?
您忘记打电话给 hotObservable.connect
。以下代码输出正是你想要的:
import rx.lang.scala._
import rx.lang.scala.observables._
import scala.concurrent.duration._
val intPerSecond = Observable.interval(1.seconds).map(_.toInt)
val coldObservable : Observable[Int] = intPerSecond
val hotObservable : ConnectableObservable[Int] = coldObservable.publish
val cachedObservable = hotObservable.replay(3)
cachedObservable.connect
hotObservable.connect
cachedObservable.foreach(i => println(s"1: $i"))
Thread.sleep(6000)
cachedObservable.foreach(i => println(s"2: $i"))
我正在尝试理解 RxScala 中的 replay
。我创建了一个这样的可观察对象:
lazy val toyObservable : Observable[Int] = {
val coldObservable : Observable[Int] = intPerSecond
val hotObservable : ConnectableObservable[Int] = coldObservable.publish
val cachedObservable = hotObservable //.replay(3) //<<<<<<<<< ODD THING
cachedObservable.connect
cachedObservable
}
其中 intPerSecond
从 0 开始每秒输出一个整数。第一个订阅的观察者确实每秒看到一个整数。如果第二个观察者在 t=6 秒时加入,那么从那时起他们都会以一秒的间隔看到匹配的流 6...7...8...9...。符合预期。
现在如果我添加 .replay(3)
我希望当第二个观察者加入时,他会看到 3456...7...8...9,即他会立即从缓存中获取 3 个整数,然后在生成它们时以每秒一个的速度接收它们。但是相反,现在两个观察者都看不到任何东西。我的语法有误吗?
您忘记打电话给 hotObservable.connect
。以下代码输出正是你想要的:
import rx.lang.scala._
import rx.lang.scala.observables._
import scala.concurrent.duration._
val intPerSecond = Observable.interval(1.seconds).map(_.toInt)
val coldObservable : Observable[Int] = intPerSecond
val hotObservable : ConnectableObservable[Int] = coldObservable.publish
val cachedObservable = hotObservable.replay(3)
cachedObservable.connect
hotObservable.connect
cachedObservable.foreach(i => println(s"1: $i"))
Thread.sleep(6000)
cachedObservable.foreach(i => println(s"2: $i"))