我如何将一个具有函数的 Observable 从 Future 映射到 Future?

How do I map an Observable with a function from Future to Future?

假设我有一个元素类型为 In:

的事件流
val observableIn: Observable[In] = ???

还有一个将In类型的对象转换为Out类型对象的函数,但是"in the future":

val futureInToFutureOut: (Future[In]) => Future[Out] = ???

此时我想根据我的函数futureInToFutureOut转换我的observableIn的元素。也就是说,我想要一个 Out 类型元素的事件流作为结果,与原始流的元素匹配,但通过函数 futureInToFutureOut.

转换

我认为这应该可行:

val observableOut: Observable[Out] = observableIn flatMap { in =>
  Observable.from(futureInToFutureOut(Future(in)))
}

这样对吗?有更好的方法吗?

编辑:

据我所知,您的解决方案是正确的。如果您想稍微提高性能,请考虑:

val observableOut: Observable[Out] = observableIn.flatMap { in =>
  val p = Promise.successful(in)
  Observable.from(futureInToFutureOut(p.future))
}

这有点快,因为它不会像 Future.apply does 那样创建异步计算来映射未来。

旧:

我在下面留下我的旧建议,它只适用于您在 Observable 中映射单个事件的情况。

import scala.concurrent._
val promiseIn = Promise[Int]()
observableIn.foreach(x => promiseIn.trySuccess(x))
val observableOut = Observable.create { observer =>
  promiseIn.future.map(futureInToFutureOut).foreach { y =>
    observer.onNext(y)
    observer.onCompleted()
  }
}

说明

由于您是从 Observable[In] 对象(即事件流)开始的,因此您需要找到一种方法将事件从 Observable 转移到未来。创建新未来的典型方法是首先创建其 Promise -- input side of the future object。然后,您在 Observable 上使用 foreach 以在第一个事件到达时在未来调用 trySuccess

observableIn ---x---> foreach ---x---> promiseIn.trySuccess(x)

一旦 Observable 上的事件到达,promise 将异步完成。我们现在可以通过调用它的 future 方法来获取承诺的读取端,即未来;然后在未来调用 map -- promiseIn.future.map(futureInToFutureOut)。图形化:

promiseIn.future ---x---> map ---f(x)---> futureOut

生成的未来是用 futureInToFutureOut(x) 异步完成的。在这一点上,我们需要找到一种方法通过 Observable[Out] 将这个值发回。创建新 Observable 的典型方法是调用 Observable.create 工厂方法。此方法给出了 Observable 的写入端 -- Observer,我们通过调用 onNext:

来向其发出事件
futureOut ---f(x)---> foreach ---f(x)---> observer.onNext(f(x))

因为我们知道未来最多发出一个事件,所以我们在观察者上调用 onCompleted 来关闭输出 Observable

编辑:如果您想掌握 Rx 和 Scala 期货,您可能需要考虑处理这些主题的 this book。免责声明:我是作者。