"slow" Future.traverse 版本有内建吗?

Is there a build in "slow" Future.traverse version?

我发现为一个用户请求构建大量 Future 通常是一种不好的做法。这些 Futures 可以填充将影响其他请求的执行上下文。这不太可能是您真正想要的。保持 Futures 数量少非常简单——仅在 for-comprehensions 中创建新的 Futures,使用 flatMap 等。但有时可能有必要为每个 Seq 项目创建一个 Futures。使用 Future.sequence 或 Future.traverse 会导致上述问题。所以我最终得到了这个解决方案,它不会同时为每个集合项目创建 Futures:

  def ftraverse[A, B](xs: Seq[A])(f: A => Future[B])(implicit ec: ExecutionContext): Future[Seq[B]] = {
    if(xs.isEmpty) Future successful Seq.empty[B]
    else f(xs.head) flatMap { fh => ftraverse(xs.tail)(f) map (r => fh +: r) }
  }

我想知道,也许我正在发明一个轮子,而实际上这样的函数已经存在于 Scala 的标准库中的某个地方?另外我想知道,你有没有遇到描述的问题,你是如何解决的?也许,如果这是 Futures 的一个众所周知的问题,我应该在 Future.scala 中创建一个拉取请求,这样这个函数(或它的更通用的版本)就会包含在标准库中?

UPD:更通用的版本,并行度有限:

  def ftraverse[A, B](xs: Seq[A], chunkSize: Int, maxChunks: Int)(f: A => Future[B])(implicit ec: ExecutionContext): Future[Seq[B]] = {
    val xss = xs.grouped(chunkSize).toList
    val chunks = xss.take(maxChunks-1) :+ xss.drop(maxChunks-1).flatten
    Future.sequence{ chunks.map(chunk => ftraverse(chunk)(f) ) } map { _.flatten }
  } 

不,标准库中没有这样的东西。 应该有没有,我不好说。我不认为想要以严格的顺序执行 Futures 是很常见的。但是,如果您愿意,可以很容易地实现自己的方法来实现这一点,正如您所拥有的那样。为此,我个人只是在自己的库中保留了一个方法。但是,如果有一种方法可以使用标准库来执行此操作,那将很方便。如果有 ,它应该更通用。

修改当前的traverse顺序处理Future其实很简单,而不是并行处理。这是 current version,它使用 foldLeft 而不是递归:

def traverse[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
    in.foldLeft(Future.successful(cbf(in))) { (fr, a) =>
      val fb = fn(a)
      for (r <- fr; b <- fb) yield (r += b)
    }.map(_.result())

Future 通过分配 val fb = fn(a)flatMap 之前创建(因此在之前执行)。需要做的就是将 fn(a) 移到 flatMap 中以延迟集合中后续 Future 的创建。

def traverseSeq[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
    in.foldLeft(Future.successful(cbf(in))) { (fr, a) =>
      for (r <- fr; b <- fn(a)) yield (r += b)
    }.map(_.result())

另一种可以限制执行大量 Future 的影响的方法是为它们使用不同的 ExecutionContext。例如,在 Web 应用程序中,我可能保留一个 ExecutionContext 用于数据库调用,一个用于调用 Amazon S3,另一个用于慢速数据库调用。

一个非常简单的实现可以使用固定线程池:

import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
val executorService = Executors.newFixedThreadPool(4)
val executionContext = ExecutionContext.fromExecutorService(executorService)

此处执行的大量 Future 将填充 ExecutionContext,但会阻止它们填充其他上下文。

如果您使用的是 Akka,则可以在 ActorSystem:

中使用 Dispatchers 从配置轻松创建 ExecutionContexts
my-dispatcher {
  type = Dispatcher
  executor = "fork-join-executor"
  fork-join-executor {
    parallelism-min = 2
    parallelism-factor = 2.0
    parallelism-max = 10
  }
  throughput = 100
}

如果您有一个名为 systemActorSystem,您可以通过以下方式访问它:

implicit val executionContext = system.dispatchers.lookup("my-dispatcher")

所有这些都取决于您的用例。虽然我确实将我的异步计算分离到不同的上下文中,但有时我仍然想 traverse 顺序地平滑这些上下文的使用。

看来您的问题与您创建的期货数量无关,而与它们执行的公平性有关。考虑 futures 的回调(mapflatMaponCompletefold 等)是如何处理的:它们被放置在执行者的队列中,并在它们的结果出现时执行父期货已完成。

如果你所有的期货共享同一个执行者(即队列),他们确实会像你说的那样互相破坏。解决这个公平性问题的常用方法是使用 Akka actors。对于每个请求,启动一个新的 actor(具有自己的队列)并让所有该类型 的 actor 共享一个 ExecutionContext。您可以使用 throughput 配置 属性 限制一个 actor 在转移到另一个共享 ExecutionContext 的 actor 之前执行的最大消息数。

平行合集不就是为了这个吗?

val parArray = (1 to 1000000).toArray.par
sum = parArray.map(_ + _)
res0: Int = 1784293664

看起来像一个普通的同步方法调用,但并行集合将使用线程池并行计算映射(竞争条件!)。您可以在此处找到更多详细信息:http://docs.scala-lang.org/overviews/parallel-collections/overview.html

假设 futures 的创建不是那么细粒度以至于开销会令人望而却步(在这种情况下,建议使用并行集合的答案可能是最有用的),您可以创建一个不同的,隐式的为 运行 的期货定义执行上下文,由不同的执行者支持,它有自己的线程。

您可以致电 ExecutionContext.fromExecutorServiceExecutionContext.fromExecutor 进行此操作。