为什么这个使用 Future 的例子没有比基准运行得更快?

Why doesn't this example using Future run faster then the benchmark?

我试图理解 Futures,所以我写了一个 Summer class 来划分一个列表,在 n 个不同的 futures 中执行它,并组合结果。它比非分割版本慢 5 倍,我想知道为什么。这是我的基准:

import java.util.Date

object SummerMain {
    def main(args: Array[String]) = {
        val xs = List.fill(10000000)(1)

        println("Starting")

        val t = Timer()
        val x = xs.foldLeft(0)(_+_)
        val time = t.stop

        println(s"Sum: ${x}, time: ${time} ms")
    }
}

case class Timer(startTime: Long = new Date().getTime) {
    private def curMs: Long = new Date().getTime

    def restart: Timer = Timer(curMs)
    def stop: Long = curMs - startTime
    def lap: (Long, Timer) = { val curTime = curMs
        (curTime - startTime,Timer(curTime))
    }

}

平均运行时间约为 790 毫秒。

但这大约需要 4.5 秒:

import scala.concurrent._
import duration._
import ExecutionContext.Implicits.global

object SummerMain {
    def main(args: Array[String]) = {
        val s = Summer(
                    xs = List.fill(10000000)(1),
                    nParts = 5 // The number of futures to divide it over
                )

        println("Starting")

        val t = Timer()
        val x = s.breakSum
        val time = t.stop

        println(s"Sum: ${x}, time: ${time} ms")
    }
}

case class Summer(xs: List[Int], nParts: Int) {
    lazy val elemsPer = (xs.length / nParts) + 1

    def sum(xs: List[Int]): Long =
        xs.foldLeft(0)(_+_)

    def break(ys: List[Int]): List[List[Int]] = ys match {
        case Nil    => List()
        case zs     => (zs take elemsPer) :: break(zs drop elemsPer)
    }

    def breakSum: Long = {
        val futures: List[Future[Long]] = break(xs) map { ys =>
                Future( sum(ys) )
        }

        var s: Long = 0L

        for ( f <- futures ) {
            s += Await.result(f, 10 hours)
        }

        s
    }
}

是我的算法效率太低以至于它正在弥补收益,还是我使用 Future 不正确?

问题

break 方法效率极低。

case zs     => (zs take elemsPer) :: break(zs drop elemsPer)

这段代码创建了两个新列表,这本身比简单地对项目求和要花费更多时间。

可能的解决方案

将数字存储在索引结构(例如 Array 或 IndexedSeq)中,并将开始和结束索引传递给每个线程。线程应计算给定索引之间的总和,但来自 same 集合。

改进空间

    for ( f <- futures ) {
        s += Await.result(f, 10 hours)
    }

可以改进以上代码以利用最大并行度。

Future.reduce 合并结果,这可能会给出更好的结果:

val sum = Future.reduce(futures)(_ + _)
Await.result(sum, 10 hours)

您尝试并行化的 + 操作非常快

基本上,它只需要遍历所有元素的时间,因为整数 + 只需要 1 个 CPU 周期。无与伦比。

问题是仅仅打散原始列表的操作比仅仅对所有元素求和需要更多的时间:你需要分配新的内存并且你需要经过列表中的所有元素(将它们放入新列表中),这足以首先获得结果!

一旦列表被拆分,提交Runnable会触发创建5个线程,这是一个不自由的操作,从而带来新的开销。只有这样,并行化 可能 比非并发版本更快。

并行化代价高昂的操作时,并行化最有趣。在快速操作中,最好使用一个 CPU 的所有功能,同时使用其他的进行其他操作。