可观察的 Rx Scala 行为
Observable Rx Scala behavior
以下示例来自我目前正在阅读的有关 Observables 的一本书:
object ObservablesCreate extends App {
val vms = Observable.create[String] { obs =>
 obs.onNext("JVM")
obs.onNext("DartVM")
obs.onNext("V8")
obs.onCompleted()
Subscription()
}
vms.subscribe(log _, e => log(s"oops - $e"), () => log("Done!"))
}
后面的文字暗示上面的代码片段有一个非常容易理解的同步订阅方法。我的问题是,这不是使用 Observables 进行异步事件发射的全部目的吗?我的意思是这里与 Iterable 没有区别。这是为了某种解释而解释的吗?
是的,只是为了解释create
工厂方法。在该片段中,您在订阅者订阅时同步生成元素。
但是您可以用同样的方式异步调用 onNext
,例如当 Future
解析时:
object ObservablesCreate extends App {
def future: Future[String] = ???
val vms = Observable.create[String] { obs =>
val f = future
f onComplete {
case Success(s) => {
obs.onNext(s)
obs.onCompleted()
}
case Failure(exception) => obs.onError(exception)
}
Subscription()
}
vms.subscribe(log _, e => log(s"oops - $e"), () => log("Done!"))
}
(对于这种特定情况,使用 Observable.from
方法将 Future
转换为 Observable
更好)
以下示例来自我目前正在阅读的有关 Observables 的一本书:
object ObservablesCreate extends App {
val vms = Observable.create[String] { obs =>
 obs.onNext("JVM")
obs.onNext("DartVM")
obs.onNext("V8")
obs.onCompleted()
Subscription()
}
vms.subscribe(log _, e => log(s"oops - $e"), () => log("Done!"))
}
后面的文字暗示上面的代码片段有一个非常容易理解的同步订阅方法。我的问题是,这不是使用 Observables 进行异步事件发射的全部目的吗?我的意思是这里与 Iterable 没有区别。这是为了某种解释而解释的吗?
是的,只是为了解释create
工厂方法。在该片段中,您在订阅者订阅时同步生成元素。
但是您可以用同样的方式异步调用 onNext
,例如当 Future
解析时:
object ObservablesCreate extends App {
def future: Future[String] = ???
val vms = Observable.create[String] { obs =>
val f = future
f onComplete {
case Success(s) => {
obs.onNext(s)
obs.onCompleted()
}
case Failure(exception) => obs.onError(exception)
}
Subscription()
}
vms.subscribe(log _, e => log(s"oops - $e"), () => log("Done!"))
}
(对于这种特定情况,使用 Observable.from
方法将 Future
转换为 Observable
更好)