我如何将一个具有函数的 Observable 从 Future 映射到 Future?
How do I map an Observable with a function from Future to Future?
假设我有一个元素类型为 In
:
的事件流
val observableIn: Observable[In] = ???
还有一个将In
类型的对象转换为Out
类型对象的函数,但是"in the future":
val futureInToFutureOut: (Future[In]) => Future[Out] = ???
此时我想根据我的函数futureInToFutureOut
转换我的observableIn
的元素。也就是说,我想要一个 Out
类型元素的事件流作为结果,与原始流的元素匹配,但通过函数 futureInToFutureOut
.
转换
我认为这应该可行:
val observableOut: Observable[Out] = observableIn flatMap { in =>
Observable.from(futureInToFutureOut(Future(in)))
}
这样对吗?有更好的方法吗?
编辑:
据我所知,您的解决方案是正确的。如果您想稍微提高性能,请考虑:
val observableOut: Observable[Out] = observableIn.flatMap { in =>
val p = Promise.successful(in)
Observable.from(futureInToFutureOut(p.future))
}
这有点快,因为它不会像 Future.apply
does 那样创建异步计算来映射未来。
旧:
我在下面留下我的旧建议,它只适用于您在 Observable
中映射单个事件的情况。
import scala.concurrent._
val promiseIn = Promise[Int]()
observableIn.foreach(x => promiseIn.trySuccess(x))
val observableOut = Observable.create { observer =>
promiseIn.future.map(futureInToFutureOut).foreach { y =>
observer.onNext(y)
observer.onCompleted()
}
}
说明
由于您是从 Observable[In]
对象(即事件流)开始的,因此您需要找到一种方法将事件从 Observable
转移到未来。创建新未来的典型方法是首先创建其 Promise
-- input side of the future object。然后,您在 Observable
上使用 foreach
以在第一个事件到达时在未来调用 trySuccess
:
observableIn ---x---> foreach ---x---> promiseIn.trySuccess(x)
一旦 Observable
上的事件到达,promise 将异步完成。我们现在可以通过调用它的 future
方法来获取承诺的读取端,即未来;然后在未来调用 map
-- promiseIn.future.map(futureInToFutureOut)
。图形化:
promiseIn.future ---x---> map ---f(x)---> futureOut
生成的未来是用 futureInToFutureOut(x)
异步完成的。在这一点上,我们需要找到一种方法通过 Observable[Out]
将这个值发回。创建新 Observable
的典型方法是调用 Observable.create
工厂方法。此方法给出了 Observable
的写入端 -- Observer
,我们通过调用 onNext
:
来向其发出事件
futureOut ---f(x)---> foreach ---f(x)---> observer.onNext(f(x))
因为我们知道未来最多发出一个事件,所以我们在观察者上调用 onCompleted
来关闭输出 Observable
。
编辑:如果您想掌握 Rx 和 Scala 期货,您可能需要考虑处理这些主题的 this book。免责声明:我是作者。
假设我有一个元素类型为 In
:
val observableIn: Observable[In] = ???
还有一个将In
类型的对象转换为Out
类型对象的函数,但是"in the future":
val futureInToFutureOut: (Future[In]) => Future[Out] = ???
此时我想根据我的函数futureInToFutureOut
转换我的observableIn
的元素。也就是说,我想要一个 Out
类型元素的事件流作为结果,与原始流的元素匹配,但通过函数 futureInToFutureOut
.
我认为这应该可行:
val observableOut: Observable[Out] = observableIn flatMap { in =>
Observable.from(futureInToFutureOut(Future(in)))
}
这样对吗?有更好的方法吗?
编辑:
据我所知,您的解决方案是正确的。如果您想稍微提高性能,请考虑:
val observableOut: Observable[Out] = observableIn.flatMap { in =>
val p = Promise.successful(in)
Observable.from(futureInToFutureOut(p.future))
}
这有点快,因为它不会像 Future.apply
does 那样创建异步计算来映射未来。
旧:
我在下面留下我的旧建议,它只适用于您在 Observable
中映射单个事件的情况。
import scala.concurrent._
val promiseIn = Promise[Int]()
observableIn.foreach(x => promiseIn.trySuccess(x))
val observableOut = Observable.create { observer =>
promiseIn.future.map(futureInToFutureOut).foreach { y =>
observer.onNext(y)
observer.onCompleted()
}
}
说明
由于您是从 Observable[In]
对象(即事件流)开始的,因此您需要找到一种方法将事件从 Observable
转移到未来。创建新未来的典型方法是首先创建其 Promise
-- input side of the future object。然后,您在 Observable
上使用 foreach
以在第一个事件到达时在未来调用 trySuccess
:
observableIn ---x---> foreach ---x---> promiseIn.trySuccess(x)
一旦 Observable
上的事件到达,promise 将异步完成。我们现在可以通过调用它的 future
方法来获取承诺的读取端,即未来;然后在未来调用 map
-- promiseIn.future.map(futureInToFutureOut)
。图形化:
promiseIn.future ---x---> map ---f(x)---> futureOut
生成的未来是用 futureInToFutureOut(x)
异步完成的。在这一点上,我们需要找到一种方法通过 Observable[Out]
将这个值发回。创建新 Observable
的典型方法是调用 Observable.create
工厂方法。此方法给出了 Observable
的写入端 -- Observer
,我们通过调用 onNext
:
futureOut ---f(x)---> foreach ---f(x)---> observer.onNext(f(x))
因为我们知道未来最多发出一个事件,所以我们在观察者上调用 onCompleted
来关闭输出 Observable
。
编辑:如果您想掌握 Rx 和 Scala 期货,您可能需要考虑处理这些主题的 this book。免责声明:我是作者。