如何限制未来的执行?

How to throttle the execution of future?

我基本上有唯一 ID 列表,对于每个 ID,我都会调用 returns 未来的函数。 问题是单个调用中的期货数量是可变的。

list.map(id -> futureCall) 并行性过多会影响我的系统。我想配置并行执行的期货数量。 我想要可测试的设计,所以我做不到 this 经过大量搜索,我发现 this 我不明白如何使用它。我试过了,但没有用。

之后,我刚刚将它导入到我正在打电话的 class 中。 我使用了相同的代码片段并将默认的 maxConcurrent 设置为 4。 我用 ThrottledExecutionContext

替换了导入全局执行上下文

你必须用 ThrottledExecutionContext 包裹你的 ExecutionContext

这是一个小例子:

object TestApp extends App {

  implicit val ec = ThrottledExecutionContext(maxConcurrents = 10)(scala.concurrent.ExecutionContext.global)

  def futureCall(id:Int) = Future {
    println(s"executing $id")
    Thread.sleep(500)
    id
  }

  val list = 1 to 1000
  val results = list.map(futureCall)

  Await.result(Future.sequence(results), 100.seconds)
}

或者您也可以尝试 FixedThreadPool:

implicit val ec = ExecutionContext.fromExecutor(java.util.concurrent.Executors.newFixedThreadPool(10))

我不确定你想在这里做什么。默认 global ExecutionContext 使用与 CPU 内核一样多的线程。所以,这就是你的并行性。如果您仍然 "too many",您可以使用系统 属性 控制该数字:"scala.concurrent.context.maxThreads",并将其设置为较低的数字。

这将是在任何给定时间并行执行的最大期货数量。您不需要明确限制任何内容。

或者,您可以创建自己的执行器,并给它一个 BlockingQueue 容量有限的执行器。这会像您的实现一样在生产者方面(提交工作项时)阻塞,但我强烈建议您不要这样做,因为它非常危险并且容易出现死锁,而且效率也低得多,默认 ForkJoinPool 实现。