可从 Futures 观察到 - 来自多个线程的 onNext
Observable from Futures - onNext from multiple threads
我想根据 Futures
列表的结果实时生成一个 Observable
。
在最简单的情况下,假设我有一个期货列表,我是 运行 Future.sequence
,我只是用 Observable
来监控它们的进展,它告诉我每个一次完成。我基本上是这样做的:
def observeFuturesProgress(futs: List[Future[Int]]) : Observable[String] = {
Observable[String](observer => {
val loudFutures: List[Future[Int]] = futs.map(f => {
f onComplete {
case Success(a) => observer.onNext(s"just did $a more")
case Failure(e) => observer.onError(e)
}
f
})
Future.sequence(loudFutures) onComplete {
case Success(_) => observer.onCompleted()
case Failure(e) => observer.onError(e)
}
})
}
这在我的测试环境中运行良好。但是我刚刚读到 onNext
不应该从不同的线程调用,至少没有注意没有重叠调用。解决此问题的推荐方法是什么?似乎 Observables
的许多实际用途都需要从这样的异步代码调用 onNext
,但我在文档中找不到类似的示例。
Observables must issue notifications to observers serially (not in
parallel). They may issue these notifications from different threads,
but there must be a formal happens-before relationship between the
notifications.
It is possible for an Observable to invoke its observers’ methods
asynchronously, perhaps from different threads. This could make such
an Observable violate the Observable contract, in that it might try to
send an OnCompleted or OnError notification before one of its OnNext
notifications, or it might make an OnNext notification from two
different threads concurrently. You can force such an Observable to be
well-behaved and synchronous by applying the Serialize operator to it.
我想根据 Futures
列表的结果实时生成一个 Observable
。
在最简单的情况下,假设我有一个期货列表,我是 运行 Future.sequence
,我只是用 Observable
来监控它们的进展,它告诉我每个一次完成。我基本上是这样做的:
def observeFuturesProgress(futs: List[Future[Int]]) : Observable[String] = {
Observable[String](observer => {
val loudFutures: List[Future[Int]] = futs.map(f => {
f onComplete {
case Success(a) => observer.onNext(s"just did $a more")
case Failure(e) => observer.onError(e)
}
f
})
Future.sequence(loudFutures) onComplete {
case Success(_) => observer.onCompleted()
case Failure(e) => observer.onError(e)
}
})
}
这在我的测试环境中运行良好。但是我刚刚读到 onNext
不应该从不同的线程调用,至少没有注意没有重叠调用。解决此问题的推荐方法是什么?似乎 Observables
的许多实际用途都需要从这样的异步代码调用 onNext
,但我在文档中找不到类似的示例。
Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.
It is possible for an Observable to invoke its observers’ methods asynchronously, perhaps from different threads. This could make such an Observable violate the Observable contract, in that it might try to send an OnCompleted or OnError notification before one of its OnNext notifications, or it might make an OnNext notification from two different threads concurrently. You can force such an Observable to be well-behaved and synchronous by applying the Serialize operator to it.