在 Scala 中将 Rx-Observables 转换为 Twitter Futures

Converting Rx-Observables to Twitter Futures in Scala

我想以最主动的方式实现以下功能。我需要这些来实现上述类型之间自动转换的双射。

def convertScalaRXObservableToTwitterFuture[A](a: Observable[A]): TwitterFuture[A] = ???
def convertScalaRXObservableToTwitterFutureList[A](a: Observable[A]): TwitterFuture[List[A]] = ???

我看到了 this 篇有关相关主题的文章,但无法正常工作。

不幸的是,那篇文章中的说法是不正确的,ObservableFuture 之间不可能存在真正的双射。问题在于 Observable 是更强大的抽象,可以表示 Future 无法表示的事物。例如,Observable 实际上可能表示一个无限序列。例如,请参阅该文章中使用的 Observable.interval. Obviously there is no way to represent something like this with a Future. The Observable.toList 调用明确提到:

Returns a Single that emits a single item, a list composed of all the items emitted by the finite source ObservableSource.

然后它说:

Sources that are infinite and never complete will never emit anything through this operator and an infinite source may lead to a fatal OutOfMemoryError.

即使您将自己限制在有限的 Observable 中,仍然 Future 无法完全表达 Observable 的语义。考虑 Observable.intervalRange 在一段时间内一个一个地生成一个有限的范围。对于 Observable,第一个事件发生在 initialDelay 之后,然后每个 period 都会发生事件。使用 Future 您只能获得一个事件,并且必须仅在序列完全生成时才能完成,因此 Observable 已完成。这意味着通过将 Observable[A] 转换为 Future[List[A]] 你会立即破坏 Observable 的主要好处 - 反应性:你不能一个一个地处理事件,你必须在一个单一的过程中处理它们束。

总结文章第一段的主张:

convert between the two, without loosing asynchronous and event-driven nature of them.

是错误的,因为转换 Observable[A] -> Future[List[A]] 恰好丢失了 Observable 的 "event-driven nature" 并且没有办法解决这个问题。

P.S。实际上 Future 不如 Observable 强大这一事实并不令人感到意外。如果不是,为什么有人会首先创建 Observable