Scala - 来自 Futures 的批处理流

Scala - Batched Stream from Futures

我有一个案例 class Thing 的实例,我有一堆 运行 的查询 return Thing 的集合像这样:

def queries: Seq[Future[Seq[Thing]]]

我需要从所有未来(如上)收集所有 Things 并将它们分组为大小相同的 10,000 个集合,以便它们可以序列化为 10,000 个 Things 的文件。

def serializeThings(Seq[Thing]): Future[Unit]

我希望它的实现方式是在序列化之前不等待对 运行 的所有查询。在第一个查询的未来完成后,一旦有 10,000 Things returned,我想开始序列化。

如果我这样做:

Future.sequence(queries)

它将收集所有查询的结果,但我的理解是,在所有查询完成之前不会调用像 map 这样的操作,并且所有 Thing 必须适合内存一次。

使用 Scala 集合和并发库实现批处理流管道的最佳方法是什么?

在你做之前 Future.sequence 做你想做的个人未来然后使用 Future.sequence.

//this can be used for serializing
def doSomething(): Unit = ???

//do something with the failed future
def doSomethingElse(): Unit = ???

def doSomething(list: List[_]) = ???

val list: List[Future[_]] = List.fill(10000)(Future(doSomething()))

val newList = 
list.par.map { f =>
  f.map { result =>
    doSomething()
  }.recover { case throwable =>
    doSomethingElse()
  }
}

Future.sequence(newList).map ( list => doSomething(list)) //wait till all are complete

您可以使用 Future.traverse

而不是 newList
Future.traverse(list)(f => f.map( x => doSomething()).recover {case th =>  doSomethingElse() }).map ( completeListOfValues => doSomething(completeListOfValues))

我想我成功了。解决方案是基于我之前的。它从 Future[List[Thing]] 个结果中收集结果,直到达到 BatchSize 的阈值。然后它调用 serializeThings future,当它完成时,循环继续其余部分。

object BatchFutures extends App {

  case class Thing(id: Int)

  def getFuture(id: Int): Future[List[Thing]] = {
    Future.successful {
      List.fill(3)(Thing(id))
    }
  }

  def serializeThings(things: Seq[Thing]): Future[Unit] = Future.successful {
    //Thread.sleep(2000)
    println("processing: " + things)
  }

  val ids = (1 to 4).toList
  val BatchSize = 5

  val future = ids.foldLeft(Future.successful[List[Thing]](Nil)) {
    case (acc, id) =>
      acc flatMap { processed =>
        getFuture(id) flatMap { res =>
          val all = processed ++ res
          val (batch, rest) = all.splitAt(5)

          if (batch.length == BatchSize) { // if futures filled the batch with needed amount
            serializeThings(batch) map { _ =>
              rest // process the rest
            }
          } else {
            Future.successful(all) //if we need more Things for a batch
          }
        }
      }
  }.flatMap { rest =>
    serializeThings(rest)
  }

  Await.result(future, Duration.Inf)

}

结果打印:

processing: List(Thing(1), Thing(1), Thing(1), Thing(2), Thing(2))
processing: List(Thing(2), Thing(3), Thing(3), Thing(3), Thing(4))
processing: List(Thing(4), Thing(4))

Thing 的数量不能被 BatchSize 整除时,我们必须再次调用 serializeThings(最后一次 flatMap)。我希望它有所帮助! :)