处理 scala futures 内存不足错误

handling scala futures out of memory error

我正在尝试使用 futures 使 scala 进程并行,

implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(5))

val result = Future.traverse(listOfInputs) { input =>
    Future {
        // time consuming process
    }
}

result.onComplete {
    case Success(value) => display msg
    case Failure(exception) => throw exception
}

以上工作正常,当输入数量较少时,当输入数量增加时, 程序消耗更多内存,并且 OS 在执行期间终止进程。

有什么地方可以限制在 Scala 中使用的内存或线程? 感谢您的帮助。

您在示例中使用了一个固定大小的线程池,其中包含 5 个线程,并且您正在使用 Future { … } 构建 Futures,这意味着每个 Future 将占用一个线程,只要它是 运行。因此,在您的示例代码中,并行度已经严格限制为 5,当您的示例代码甚至无法证明问题时,很难对您的问题给出有用的答案。

也就是说,限制并行度的一种方法是使用信号量。信号量基本上是一组有限的做事许可,如果您编写代码以便它在开始工作之前获得许可并在完成后将其放回原处,这将限制您应用程序中的并行度。在 twitter 的 util-core 库中有一个信号量实现。

https://github.com/twitter/util

https://twitter.github.io/util/docs/com/twitter/concurrent/AsyncSemaphore.html

val sem = new AsyncSemaphore(5)
val result = Future.traverse(listOfInputs) { input =>
  sem.acquireAndRun {
    // create Future here.
  }
}

但是,我建议完全使用其他东西,因为 Scala Futures 太糟糕了。它们缺乏基本功能,例如异步 try/finally 构造或中断计算的能力。

ZIO 具有这些功能以及更多内置功能。例如,您可以使用 ZIO.foreachParN 方法遍历列表。它将限制并行性,如果您的计算之一因错误而失败,它将中止所有其他计算以避免浪费时间和内存。打开的文件等资源会自动释放