按时间间隔创建的 Observable

Observables created at time interval

我正在查看在给定时间间隔创建的 RxScala 可观察对象:

val periodic: Observable[Long] = Observable.interval(100 millis)

periodic.foreach(x => println(x))

如果我将其放入工作表中,我会得到以下结果:

periodic: rx.lang.scala.Observable[Long] = rx.lang.scala.JavaConversions$$anon@2cce3493

res0: Unit = ()

这让我感到困惑:periodic 的元素实际上包含什么?

它们包含一些索引吗? 它们是否包含创建它们的时间间隔?

如您在此处所见,http://reactivex.io/documentation/operators/interval.html 生成的元素是从 0.

递增的 Long

至于你的代码和结果:

在这里,您创建可观察对象,并将 Observable[Long] 分配给 periodic。一切如预期。

scala> val periodic: Observable[Long] = Observable.interval(100 millis)
periodic: rx.lang.scala.Observable[Long] = rx.lang.scala.JavaConversions$$anon@2cce3493

在这里,您注册一个回调,即当值被发出时会发生什么。 return 类型的 foreach 方法是 Unit,因为它没有合理的值,只是为了注册回调的副作用。

periodic.foreach(x => println(x))
res0: Unit = ()

您看不到实际值,因为执行已停止。尝试插入 Thread.sleep.

val periodic: Observable[Long] = Observable.interval(100.millis)
periodic.foreach(x => println(x))
Thread.sleep(1000)

给出类似于

的输出
periodic: rx.lang.scala.Observable[Long] = rx.lang.scala.JavaConversions$$anon@207cb62f

res0: Unit = ()

0
1
2
3
4
5
6
7
8
9
res1: Unit = ()

问题是间隔是异步的,所以你不需要等待结果。

另一种等待结果的方法是使用 TestSubscriber

def interval(): Unit = {
addHeader("Interval observable")
Observable.interval(createDuration(100))
  .map(n => "New item emitted:" + n)
  .doOnNext(s => print("\n" + s))
  .subscribe();
new TestSubscriber[Subscription].awaitTerminalEvent(1000, TimeUnit.MILLISECONDS);
 }

您可以在此处查看更多示例https://github.com/politrons/reactiveScala