使用 ScalaMeter 测试并行操作的性能
Testing performance of parallel actions with ScalaMeter
我使用了取自 ScalaMeter 示例存储库 (https://github.com/scalameter/scalameter-examples) 的 basic
示例。示例代码如下:
measure method "map" in {
using(ranges) in { r =>
r.map(_ + 1)
}
}
在我的例子中,我想测试 运行 一些并行操作的加速。一个简单的例子可能是,我划分了一个必须映射的范围。我并行使用了从 Java8
到 运行 的 ExecutorService
任务。它看起来像这样:
val cores = Runtime.getRuntime.availableProcessors()
val pool = Executors.newFixedThreadPool(cores)
measure method "parallel map" in {
using(ranges) in { r =>
val tasks = (0 until cores).map { t =>
new Callable[Unit] {
override def call() = (0 + t until r.last by cores).map(_ + 1)
}
}
import collection.JavaConverters._
pool.invokeAll(tasks.asJava)
}
}
问题虽然并行测试完成(你可以看到时间结果)它没有return退出代码。这意味着如果我将 Bench.LocalTime
更改为 Bench.ForkedTime
即使结果也消失了。我很困惑发生了什么。
有什么想法吗?
好的,这很简单,因为我忘记了 shutdown()
池。在 invokeAll
之后添加它后,我得到如下结果:
measure method "parallel map" in {
using(ranges) in { r =>
val pool = Executors.newFixedThreadPool(cores)
val tasks = (0 until cores).map { t =>
new Callable[Unit] {
override def call() = (0 + t until r.last by cores).map(_ + 1)
}
}
pool.invokeAll(tasks.asJava)
pool.shutdown()
}
}
唯一的问题是,现在不仅要测量操作,还要测量 ExecutorService
的创建时间和关闭它。但我想暂时不会改变结果。
实际上,一段时间后我发现了一种更简单、更 'Scala' 的方法来完成上述操作。您可以简单地创建一个任务列表作为要创建的函数列表(或者仍然可以是 Callables
的列表),然后使用 parallel
集合调用所有任务。代码如下所示:
measure method "parallel map" in {
using(ranges) in { r =>
val tasks = (0 until cores).flatMap { t =>
(0 + t until r by cores).map(i => () => i+1)
}
tasks.par.map(_.apply())
}
}
甚至更简单,因为任务列表不关心 cores
现在:
val tasks = (0 until r).map(i => () => i+1)
我使用了取自 ScalaMeter 示例存储库 (https://github.com/scalameter/scalameter-examples) 的 basic
示例。示例代码如下:
measure method "map" in {
using(ranges) in { r =>
r.map(_ + 1)
}
}
在我的例子中,我想测试 运行 一些并行操作的加速。一个简单的例子可能是,我划分了一个必须映射的范围。我并行使用了从 Java8
到 运行 的 ExecutorService
任务。它看起来像这样:
val cores = Runtime.getRuntime.availableProcessors()
val pool = Executors.newFixedThreadPool(cores)
measure method "parallel map" in {
using(ranges) in { r =>
val tasks = (0 until cores).map { t =>
new Callable[Unit] {
override def call() = (0 + t until r.last by cores).map(_ + 1)
}
}
import collection.JavaConverters._
pool.invokeAll(tasks.asJava)
}
}
问题虽然并行测试完成(你可以看到时间结果)它没有return退出代码。这意味着如果我将 Bench.LocalTime
更改为 Bench.ForkedTime
即使结果也消失了。我很困惑发生了什么。
有什么想法吗?
好的,这很简单,因为我忘记了 shutdown()
池。在 invokeAll
之后添加它后,我得到如下结果:
measure method "parallel map" in {
using(ranges) in { r =>
val pool = Executors.newFixedThreadPool(cores)
val tasks = (0 until cores).map { t =>
new Callable[Unit] {
override def call() = (0 + t until r.last by cores).map(_ + 1)
}
}
pool.invokeAll(tasks.asJava)
pool.shutdown()
}
}
唯一的问题是,现在不仅要测量操作,还要测量 ExecutorService
的创建时间和关闭它。但我想暂时不会改变结果。
实际上,一段时间后我发现了一种更简单、更 'Scala' 的方法来完成上述操作。您可以简单地创建一个任务列表作为要创建的函数列表(或者仍然可以是 Callables
的列表),然后使用 parallel
集合调用所有任务。代码如下所示:
measure method "parallel map" in {
using(ranges) in { r =>
val tasks = (0 until cores).flatMap { t =>
(0 + t until r by cores).map(i => () => i+1)
}
tasks.par.map(_.apply())
}
}
甚至更简单,因为任务列表不关心 cores
现在:
val tasks = (0 until r).map(i => () => i+1)