可观察的 Rx Scala 行为

Observable Rx Scala behavior

以下示例来自我目前正在阅读的有关 Observables 的一本书:

object ObservablesCreate extends App {
  val vms = Observable.create[String] { obs =>
    obs.onNext("JVM")
    obs.onNext("DartVM")
    obs.onNext("V8")
    obs.onCompleted()
    Subscription()
  }
  vms.subscribe(log _, e => log(s"oops - $e"), () => log("Done!"))
}

后面的文字暗示上面的代码片段有一个非常容易理解的同步订阅方法。我的问题是,这不是使用 Observables 进行异步事件发射的全部目的吗?我的意思是这里与 Iterable 没有区别。这是为了某种解释而解释的吗?

是的,只是为了解释create工厂方法。在该片段中,您在订阅者订阅时同步生成元素。

但是您可以用同样的方式异步调用 onNext,例如当 Future 解析时:

object ObservablesCreate extends App {
    def future: Future[String] = ???
    val vms = Observable.create[String] { obs =>
        val f = future
        f onComplete {
            case Success(s) => {
                obs.onNext(s)
                obs.onCompleted()
            }
            case Failure(exception) => obs.onError(exception)
        }

        Subscription()
    }

    vms.subscribe(log _, e => log(s"oops - $e"), () => log("Done!"))
}

(对于这种特定情况,使用 Observable.from 方法将 Future 转换为 Observable 更好)