按时间间隔创建的 Observable
Observables created at time interval
我正在查看在给定时间间隔创建的 RxScala 可观察对象:
val periodic: Observable[Long] = Observable.interval(100 millis)
periodic.foreach(x => println(x))
如果我将其放入工作表中,我会得到以下结果:
periodic: rx.lang.scala.Observable[Long] = rx.lang.scala.JavaConversions$$anon@2cce3493
res0: Unit = ()
这让我感到困惑:periodic
的元素实际上包含什么?
它们包含一些索引吗?
它们是否包含创建它们的时间间隔?
如您在此处所见,http://reactivex.io/documentation/operators/interval.html 生成的元素是从 0
.
递增的 Long
值
至于你的代码和结果:
在这里,您创建可观察对象,并将 Observable[Long]
分配给 periodic
。一切如预期。
scala> val periodic: Observable[Long] = Observable.interval(100 millis)
periodic: rx.lang.scala.Observable[Long] = rx.lang.scala.JavaConversions$$anon@2cce3493
在这里,您注册一个回调,即当值被发出时会发生什么。 return 类型的 foreach
方法是 Unit
,因为它没有合理的值,只是为了注册回调的副作用。
periodic.foreach(x => println(x))
res0: Unit = ()
您看不到实际值,因为执行已停止。尝试插入 Thread.sleep
.
val periodic: Observable[Long] = Observable.interval(100.millis)
periodic.foreach(x => println(x))
Thread.sleep(1000)
给出类似于
的输出
periodic: rx.lang.scala.Observable[Long] = rx.lang.scala.JavaConversions$$anon@207cb62f
res0: Unit = ()
0
1
2
3
4
5
6
7
8
9
res1: Unit = ()
问题是间隔是异步的,所以你不需要等待结果。
另一种等待结果的方法是使用 TestSubscriber
def interval(): Unit = {
addHeader("Interval observable")
Observable.interval(createDuration(100))
.map(n => "New item emitted:" + n)
.doOnNext(s => print("\n" + s))
.subscribe();
new TestSubscriber[Subscription].awaitTerminalEvent(1000, TimeUnit.MILLISECONDS);
}
您可以在此处查看更多示例https://github.com/politrons/reactiveScala
我正在查看在给定时间间隔创建的 RxScala 可观察对象:
val periodic: Observable[Long] = Observable.interval(100 millis)
periodic.foreach(x => println(x))
如果我将其放入工作表中,我会得到以下结果:
periodic: rx.lang.scala.Observable[Long] = rx.lang.scala.JavaConversions$$anon@2cce3493
res0: Unit = ()
这让我感到困惑:periodic
的元素实际上包含什么?
它们包含一些索引吗? 它们是否包含创建它们的时间间隔?
如您在此处所见,http://reactivex.io/documentation/operators/interval.html 生成的元素是从 0
.
Long
值
至于你的代码和结果:
在这里,您创建可观察对象,并将 Observable[Long]
分配给 periodic
。一切如预期。
scala> val periodic: Observable[Long] = Observable.interval(100 millis)
periodic: rx.lang.scala.Observable[Long] = rx.lang.scala.JavaConversions$$anon@2cce3493
在这里,您注册一个回调,即当值被发出时会发生什么。 return 类型的 foreach
方法是 Unit
,因为它没有合理的值,只是为了注册回调的副作用。
periodic.foreach(x => println(x))
res0: Unit = ()
您看不到实际值,因为执行已停止。尝试插入 Thread.sleep
.
val periodic: Observable[Long] = Observable.interval(100.millis)
periodic.foreach(x => println(x))
Thread.sleep(1000)
给出类似于
的输出periodic: rx.lang.scala.Observable[Long] = rx.lang.scala.JavaConversions$$anon@207cb62f
res0: Unit = ()
0
1
2
3
4
5
6
7
8
9
res1: Unit = ()
问题是间隔是异步的,所以你不需要等待结果。
另一种等待结果的方法是使用 TestSubscriber
def interval(): Unit = {
addHeader("Interval observable")
Observable.interval(createDuration(100))
.map(n => "New item emitted:" + n)
.doOnNext(s => print("\n" + s))
.subscribe();
new TestSubscriber[Subscription].awaitTerminalEvent(1000, TimeUnit.MILLISECONDS);
}
您可以在此处查看更多示例https://github.com/politrons/reactiveScala