在 Scala 中将 Rx-Observables 转换为 Twitter Futures

Converting Rx-Observables to Twitter Futures in Scala


def convertScalaRXObservableToTwitterFuture[A](a: Observable[A]): TwitterFuture[A] = ???
def convertScalaRXObservableToTwitterFutureList[A](a: Observable[A]): TwitterFuture[List[A]] = ???

我看到了 this 篇有关相关主题的文章,但无法正常工作。

不幸的是,那篇文章中的说法是不正确的,ObservableFuture 之间不可能存在真正的双射。问题在于 Observable 是更强大的抽象,可以表示 Future 无法表示的事物。例如,Observable 实际上可能表示一个无限序列。例如,请参阅该文章中使用的 Observable.interval. Obviously there is no way to represent something like this with a Future. The Observable.toList 调用明确提到:

Returns a Single that emits a single item, a list composed of all the items emitted by the finite source ObservableSource.


Sources that are infinite and never complete will never emit anything through this operator and an infinite source may lead to a fatal OutOfMemoryError.

即使您将自己限制在有限的 Observable 中,仍然 Future 无法完全表达 Observable 的语义。考虑 Observable.intervalRange 在一段时间内一个一个地生成一个有限的范围。对于 Observable,第一个事件发生在 initialDelay 之后,然后每个 period 都会发生事件。使用 Future 您只能获得一个事件,并且必须仅在序列完全生成时才能完成,因此 Observable 已完成。这意味着通过将 Observable[A] 转换为 Future[List[A]] 你会立即破坏 Observable 的主要好处 - 反应性:你不能一个一个地处理事件,你必须在一个单一的过程中处理它们束。


convert between the two, without loosing asynchronous and event-driven nature of them.

是错误的,因为转换 Observable[A] -> Future[List[A]] 恰好丢失了 Observable 的 "event-driven nature" 并且没有办法解决这个问题。

P.S。实际上 Future 不如 Observable 强大这一事实并不令人感到意外。如果不是,为什么有人会首先创建 Observable