Scala Observable 的创建阻碍了我的未来
Scala Observable Creation Blocks My Futures
我想处理ea。异步查询提取(每个查询可能多次提取)。为了做到这一点,我将处理函数(returns a Future
)传递给我的查询方法来为 ea 调用它。拿来。我事先不知道查询的结果大小;我只知道我提取的最大大小。因此,我的查询 returns 一个 Observable
(与 List
相对,例如,我需要事先知道大小)。唯一的问题是,当我使用 Observable
create
或 apply
时,它会在内部阻塞,直到我的 Future
在调用下一个 onNext 之前完成——有效地删除了一个我希望从未来获得性能提升。 Observable
from
工厂方法不会阻塞,但它需要一个 Iterable
。我可以给它传递一个可变的 Iterable
并随着新的获取的到来而增长。有人有一个更透明的 sol'n 吗?这是代码:
object Repository {
def query(fetchSize: Int)(f: Set[Int] => Future[Set[Int]]): Observable[Future[Set[Int]]] = {
// observable (as opposed to list) because modeling a process
// where the total result size is unknown beforehand.
// Also, not creating or applying because it blocks the futures
val mut = scala.collection.mutable.Set[Future[Set[Int]]]()
val obs = Observable.from(mut)
1 to 2100 by fetchSize foreach { i =>
mut += f(DataSource.fetch(i, fetchSize))
}
obs
}
}
我能够使用 foldLeft 消除可变性:
(1 to 21 by fetchSize).foldLeft(Observable just Future((Set[Int]()))) { (obs, i) =>
obs + f(DataSource.fetch(i)())
}
其中:
implicit class FutureObservable(obs: Observable[Future[Set[Int]]]) {
def +(future: Future[Set[Int]]) =
obs merge (Observable just future)
}
唯一的问题是我不喜欢我必须做的创建一个编译器没有抱怨的空 Observable
。如果谁有更好的答案,请post点个赞,我会标注的
我想处理ea。异步查询提取(每个查询可能多次提取)。为了做到这一点,我将处理函数(returns a Future
)传递给我的查询方法来为 ea 调用它。拿来。我事先不知道查询的结果大小;我只知道我提取的最大大小。因此,我的查询 returns 一个 Observable
(与 List
相对,例如,我需要事先知道大小)。唯一的问题是,当我使用 Observable
create
或 apply
时,它会在内部阻塞,直到我的 Future
在调用下一个 onNext 之前完成——有效地删除了一个我希望从未来获得性能提升。 Observable
from
工厂方法不会阻塞,但它需要一个 Iterable
。我可以给它传递一个可变的 Iterable
并随着新的获取的到来而增长。有人有一个更透明的 sol'n 吗?这是代码:
object Repository {
def query(fetchSize: Int)(f: Set[Int] => Future[Set[Int]]): Observable[Future[Set[Int]]] = {
// observable (as opposed to list) because modeling a process
// where the total result size is unknown beforehand.
// Also, not creating or applying because it blocks the futures
val mut = scala.collection.mutable.Set[Future[Set[Int]]]()
val obs = Observable.from(mut)
1 to 2100 by fetchSize foreach { i =>
mut += f(DataSource.fetch(i, fetchSize))
}
obs
}
}
我能够使用 foldLeft 消除可变性:
(1 to 21 by fetchSize).foldLeft(Observable just Future((Set[Int]()))) { (obs, i) =>
obs + f(DataSource.fetch(i)())
}
其中:
implicit class FutureObservable(obs: Observable[Future[Set[Int]]]) {
def +(future: Future[Set[Int]]) =
obs merge (Observable just future)
}
唯一的问题是我不喜欢我必须做的创建一个编译器没有抱怨的空 Observable
。如果谁有更好的答案,请post点个赞,我会标注的