Scala - 来自 Futures 的批处理流
Scala - Batched Stream from Futures
我有一个案例 class Thing
的实例,我有一堆 运行 的查询 return Thing
的集合像这样:
def queries: Seq[Future[Seq[Thing]]]
我需要从所有未来(如上)收集所有 Thing
s 并将它们分组为大小相同的 10,000 个集合,以便它们可以序列化为 10,000 个 Thing
s 的文件。
def serializeThings(Seq[Thing]): Future[Unit]
我希望它的实现方式是在序列化之前不等待对 运行 的所有查询。在第一个查询的未来完成后,一旦有 10,000 Thing
s returned,我想开始序列化。
如果我这样做:
Future.sequence(queries)
它将收集所有查询的结果,但我的理解是,在所有查询完成之前不会调用像 map
这样的操作,并且所有 Thing
必须适合内存一次。
使用 Scala 集合和并发库实现批处理流管道的最佳方法是什么?
在你做之前 Future.sequence
做你想做的个人未来然后使用 Future.sequence
.
//this can be used for serializing
def doSomething(): Unit = ???
//do something with the failed future
def doSomethingElse(): Unit = ???
def doSomething(list: List[_]) = ???
val list: List[Future[_]] = List.fill(10000)(Future(doSomething()))
val newList =
list.par.map { f =>
f.map { result =>
doSomething()
}.recover { case throwable =>
doSomethingElse()
}
}
Future.sequence(newList).map ( list => doSomething(list)) //wait till all are complete
您可以使用 Future.traverse
而不是 newList
代
Future.traverse(list)(f => f.map( x => doSomething()).recover {case th => doSomethingElse() }).map ( completeListOfValues => doSomething(completeListOfValues))
我想我成功了。解决方案是基于我之前的。它从 Future[List[Thing]]
个结果中收集结果,直到达到 BatchSize
的阈值。然后它调用 serializeThings
future,当它完成时,循环继续其余部分。
object BatchFutures extends App {
case class Thing(id: Int)
def getFuture(id: Int): Future[List[Thing]] = {
Future.successful {
List.fill(3)(Thing(id))
}
}
def serializeThings(things: Seq[Thing]): Future[Unit] = Future.successful {
//Thread.sleep(2000)
println("processing: " + things)
}
val ids = (1 to 4).toList
val BatchSize = 5
val future = ids.foldLeft(Future.successful[List[Thing]](Nil)) {
case (acc, id) =>
acc flatMap { processed =>
getFuture(id) flatMap { res =>
val all = processed ++ res
val (batch, rest) = all.splitAt(5)
if (batch.length == BatchSize) { // if futures filled the batch with needed amount
serializeThings(batch) map { _ =>
rest // process the rest
}
} else {
Future.successful(all) //if we need more Things for a batch
}
}
}
}.flatMap { rest =>
serializeThings(rest)
}
Await.result(future, Duration.Inf)
}
结果打印:
processing: List(Thing(1), Thing(1), Thing(1), Thing(2), Thing(2))
processing: List(Thing(2), Thing(3), Thing(3), Thing(3), Thing(4))
processing: List(Thing(4), Thing(4))
当 Thing
的数量不能被 BatchSize
整除时,我们必须再次调用 serializeThings
(最后一次 flatMap
)。我希望它有所帮助! :)
我有一个案例 class Thing
的实例,我有一堆 运行 的查询 return Thing
的集合像这样:
def queries: Seq[Future[Seq[Thing]]]
我需要从所有未来(如上)收集所有 Thing
s 并将它们分组为大小相同的 10,000 个集合,以便它们可以序列化为 10,000 个 Thing
s 的文件。
def serializeThings(Seq[Thing]): Future[Unit]
我希望它的实现方式是在序列化之前不等待对 运行 的所有查询。在第一个查询的未来完成后,一旦有 10,000 Thing
s returned,我想开始序列化。
如果我这样做:
Future.sequence(queries)
它将收集所有查询的结果,但我的理解是,在所有查询完成之前不会调用像 map
这样的操作,并且所有 Thing
必须适合内存一次。
使用 Scala 集合和并发库实现批处理流管道的最佳方法是什么?
在你做之前 Future.sequence
做你想做的个人未来然后使用 Future.sequence
.
//this can be used for serializing
def doSomething(): Unit = ???
//do something with the failed future
def doSomethingElse(): Unit = ???
def doSomething(list: List[_]) = ???
val list: List[Future[_]] = List.fill(10000)(Future(doSomething()))
val newList =
list.par.map { f =>
f.map { result =>
doSomething()
}.recover { case throwable =>
doSomethingElse()
}
}
Future.sequence(newList).map ( list => doSomething(list)) //wait till all are complete
您可以使用 Future.traverse
newList
代
Future.traverse(list)(f => f.map( x => doSomething()).recover {case th => doSomethingElse() }).map ( completeListOfValues => doSomething(completeListOfValues))
我想我成功了。解决方案是基于我之前的Future[List[Thing]]
个结果中收集结果,直到达到 BatchSize
的阈值。然后它调用 serializeThings
future,当它完成时,循环继续其余部分。
object BatchFutures extends App {
case class Thing(id: Int)
def getFuture(id: Int): Future[List[Thing]] = {
Future.successful {
List.fill(3)(Thing(id))
}
}
def serializeThings(things: Seq[Thing]): Future[Unit] = Future.successful {
//Thread.sleep(2000)
println("processing: " + things)
}
val ids = (1 to 4).toList
val BatchSize = 5
val future = ids.foldLeft(Future.successful[List[Thing]](Nil)) {
case (acc, id) =>
acc flatMap { processed =>
getFuture(id) flatMap { res =>
val all = processed ++ res
val (batch, rest) = all.splitAt(5)
if (batch.length == BatchSize) { // if futures filled the batch with needed amount
serializeThings(batch) map { _ =>
rest // process the rest
}
} else {
Future.successful(all) //if we need more Things for a batch
}
}
}
}.flatMap { rest =>
serializeThings(rest)
}
Await.result(future, Duration.Inf)
}
结果打印:
processing: List(Thing(1), Thing(1), Thing(1), Thing(2), Thing(2))
processing: List(Thing(2), Thing(3), Thing(3), Thing(3), Thing(4))
processing: List(Thing(4), Thing(4))
当 Thing
的数量不能被 BatchSize
整除时,我们必须再次调用 serializeThings
(最后一次 flatMap
)。我希望它有所帮助! :)