cats-effect:使用 parSequence 时无法看到执行时间的减少
cats-effect: Unable to see decrease in execution time when using `parSequence`
我是 cats-effect 库的新手,我 运行 遇到了并行执行的问题。我有一个我认为会受益的应用程序,但是当我在玩具构造上测试这个想法时,我似乎看不出执行时间有什么不同。我觉得我一定错过了一些对其他人来说显而易见的东西,所以我想我会试试运气。在下面的代码中,我有两个跨数字序列求和的实现(addInSequence
和 addInParallel
),它们都在 run()
函数中执行。当我执行 运行 程序时,我注意到它们的 运行 时间几乎相同。我是否遗漏了一些明显的东西?
import cats.Monoid
import cats.effect.{ExitCode, IO, IOApp}
import cats.implicits._
import scala.concurrent.duration.{FiniteDuration, TimeUnit}
case class Result[A](value: A, secondsElapsed: Double)
object Result {
def total[A](results: Seq[Result[A]])(implicit mon: Monoid[A]): Result[A] = {
val out: Result[A] = results.foldLeft(Result.empty[A]) {
(out: Result[A], next: Result[A]) =>
val newValue: A = mon.combine(out.value, next.value)
val aggTime: Double = out.secondsElapsed + next.secondsElapsed
Result(newValue, aggTime)
}
out
}
def empty[A](implicit mon: Monoid[A]): Result[A] = Result(mon.empty, 0d)
implicit val intAddMon: Monoid[Int] = new Monoid[Int] {
override def empty: Int = 0
override def combine(x: Int, y: Int): Int = x + y
}
}
object ParallelMap extends IOApp {
def slowAdd(nums: Seq[Int]): Int = nums.foldLeft(0) {
(out: Int, next: Int) =>
val seconds: TimeUnit = java.util.concurrent.TimeUnit.SECONDS
val delay: IO[Unit] = IO.sleep(FiniteDuration(1L, seconds))
delay.unsafeRunSync()
out + next
}
def timeIt[A](op: => A): Result[A] = {
val start: Double = System.nanoTime / 1e9
val out: A = op
val stop: Double = System.nanoTime / 1e9
Result(out, stop - start)
}
def addInSequence(first: Seq[Int], second: Seq[Int], third: Seq[Int]): IO[Result[Int]] = {
val partialSums: Seq[Result[Int]] = Seq(first, second, third).map( ns => timeIt(slowAdd(ns)) )
IO(Result.total(partialSums))
}
def addInParallel(first: Seq[Int], second: Seq[Int], third: Seq[Int]): IO[Result[Int]] = {
val ioSeq: List[IO[Result[Int]]] = List(first, second, third).map( ns => IO(timeIt(slowAdd(ns))) )
val sums: IO[List[Result[Int]]] = ioSeq.parSequence
for {
partialSums <- sums
} yield Result.total(partialSums)
}
override def run(args: List[String]): IO[ExitCode] = {
val nums: Seq[Int] = 1 to 4
val results: IO[Seq[(String, Result[Int])]] = for {
serial <- addInSequence(nums, nums, nums)
parallel <- addInParallel(nums, nums, nums)
} yield Seq(("Serial", serial), ("Parallel", parallel))
val report: IO[Unit] = results.map(println)
report.unsafeRunSync()
IO(ExitCode.Success)
}
}
似乎我应该看到 运行时间减少了三分之一,但我必须以某种方式限制并行执行的能力。但是,docs 似乎没有暗示需要任何额外的设置,我遇到的任何其他示例也没有。任何想法将不胜感激。
两件事:
并不能保证并行操作总是更快。如果您的顺序操作很短,那么从分派到多个线程以及稍后收集所有结果的开销可能大于加速比。[=13=]
看看你测量的是什么。您有一个执行 X 工作量的顺序操作,或 3 个执行 X/3 工作量的操作。您测量所有这些然后比较:运行ning X 顺序的时间与 运行ning X/3 的总时间 3 个任务的工作量。如果顺序 运行 花费了大约 3 秒,而每个并行 运行 花费了大约 1 秒,那么按照这个逻辑,两个版本都需要 3 秒。这可能适用于我们测量 CPU 使用时间,但如果我们测量从所有工作开始到完成的时间,则不完全正确。
如果我运行你的代码我得到
@ ParallelMap.main(Array[String]())
List((Serial,Result(30,12.058612958004232)), (Parallel,Result(30,12.005087116995128)))
但是,如果我 运行 此代码改为:
object ParallelMap extends IOApp {
def slowAdd(nums: Seq[Int]): Int = nums.foldLeft(0) {
(out: Int, next: Int) =>
val seconds: TimeUnit = java.util.concurrent.TimeUnit.SECONDS
val delay: IO[Unit] = IO.sleep(FiniteDuration(1L, seconds))
delay.unsafeRunSync()
out + next
}
def timeIO[A](op: IO[A]): IO[Result[A]] = for {
start <- IO(System.nanoTime / 1e9)
out <- op
stop = System.nanoTime / 1e9
} yield Result(out, stop - start)
def addInSequence(first: Seq[Int], second: Seq[Int], third: Seq[Int]): IO[Result[Int]] = {
timeIO(IO(List(first, second, third).map(ns => slowAdd(ns)).sum))
}
def addInParallel(first: Seq[Int], second: Seq[Int], third: Seq[Int]): IO[Result[Int]] = {
// I changed is as little as possible so that you would still see
// similarity to your code, but normally I would write
// .parTraverse(f) instead of .map(f).parSequence
timeIO(List(first, second, third).map(ns => IO(slowAdd(ns))).parSequence.map(_.sum))
}
def run(args: List[String]): IO[ExitCode] = {
val nums: Seq[Int] = 1 to 4
val results: IO[Seq[(String, Result[Int])]] = for {
serial <- addInSequence(nums, nums, nums)
parallel <- addInParallel(nums, nums, nums)
} yield Seq(("Serial", serial), ("Parallel", parallel))
val report: IO[Unit] = results.map(println)
report.unsafeRunSync()
IO(ExitCode.Success)
}
}
衡量我认为你想要衡量的东西我收到这个结果:
@ ParallelMap.main(Array[String]())
List((Serial,Result(30,12.006349742005114)), (Parallel,Result(30,4.003020468982868)))
这表明并行计算的速度是顺序计算的 3 倍。
我是 cats-effect 库的新手,我 运行 遇到了并行执行的问题。我有一个我认为会受益的应用程序,但是当我在玩具构造上测试这个想法时,我似乎看不出执行时间有什么不同。我觉得我一定错过了一些对其他人来说显而易见的东西,所以我想我会试试运气。在下面的代码中,我有两个跨数字序列求和的实现(addInSequence
和 addInParallel
),它们都在 run()
函数中执行。当我执行 运行 程序时,我注意到它们的 运行 时间几乎相同。我是否遗漏了一些明显的东西?
import cats.Monoid
import cats.effect.{ExitCode, IO, IOApp}
import cats.implicits._
import scala.concurrent.duration.{FiniteDuration, TimeUnit}
case class Result[A](value: A, secondsElapsed: Double)
object Result {
def total[A](results: Seq[Result[A]])(implicit mon: Monoid[A]): Result[A] = {
val out: Result[A] = results.foldLeft(Result.empty[A]) {
(out: Result[A], next: Result[A]) =>
val newValue: A = mon.combine(out.value, next.value)
val aggTime: Double = out.secondsElapsed + next.secondsElapsed
Result(newValue, aggTime)
}
out
}
def empty[A](implicit mon: Monoid[A]): Result[A] = Result(mon.empty, 0d)
implicit val intAddMon: Monoid[Int] = new Monoid[Int] {
override def empty: Int = 0
override def combine(x: Int, y: Int): Int = x + y
}
}
object ParallelMap extends IOApp {
def slowAdd(nums: Seq[Int]): Int = nums.foldLeft(0) {
(out: Int, next: Int) =>
val seconds: TimeUnit = java.util.concurrent.TimeUnit.SECONDS
val delay: IO[Unit] = IO.sleep(FiniteDuration(1L, seconds))
delay.unsafeRunSync()
out + next
}
def timeIt[A](op: => A): Result[A] = {
val start: Double = System.nanoTime / 1e9
val out: A = op
val stop: Double = System.nanoTime / 1e9
Result(out, stop - start)
}
def addInSequence(first: Seq[Int], second: Seq[Int], third: Seq[Int]): IO[Result[Int]] = {
val partialSums: Seq[Result[Int]] = Seq(first, second, third).map( ns => timeIt(slowAdd(ns)) )
IO(Result.total(partialSums))
}
def addInParallel(first: Seq[Int], second: Seq[Int], third: Seq[Int]): IO[Result[Int]] = {
val ioSeq: List[IO[Result[Int]]] = List(first, second, third).map( ns => IO(timeIt(slowAdd(ns))) )
val sums: IO[List[Result[Int]]] = ioSeq.parSequence
for {
partialSums <- sums
} yield Result.total(partialSums)
}
override def run(args: List[String]): IO[ExitCode] = {
val nums: Seq[Int] = 1 to 4
val results: IO[Seq[(String, Result[Int])]] = for {
serial <- addInSequence(nums, nums, nums)
parallel <- addInParallel(nums, nums, nums)
} yield Seq(("Serial", serial), ("Parallel", parallel))
val report: IO[Unit] = results.map(println)
report.unsafeRunSync()
IO(ExitCode.Success)
}
}
似乎我应该看到 运行时间减少了三分之一,但我必须以某种方式限制并行执行的能力。但是,docs 似乎没有暗示需要任何额外的设置,我遇到的任何其他示例也没有。任何想法将不胜感激。
两件事:
并不能保证并行操作总是更快。如果您的顺序操作很短,那么从分派到多个线程以及稍后收集所有结果的开销可能大于加速比。[=13=]
看看你测量的是什么。您有一个执行 X 工作量的顺序操作,或 3 个执行 X/3 工作量的操作。您测量所有这些然后比较:运行ning X 顺序的时间与 运行ning X/3 的总时间 3 个任务的工作量。如果顺序 运行 花费了大约 3 秒,而每个并行 运行 花费了大约 1 秒,那么按照这个逻辑,两个版本都需要 3 秒。这可能适用于我们测量 CPU 使用时间,但如果我们测量从所有工作开始到完成的时间,则不完全正确。
如果我运行你的代码我得到
@ ParallelMap.main(Array[String]())
List((Serial,Result(30,12.058612958004232)), (Parallel,Result(30,12.005087116995128)))
但是,如果我 运行 此代码改为:
object ParallelMap extends IOApp {
def slowAdd(nums: Seq[Int]): Int = nums.foldLeft(0) {
(out: Int, next: Int) =>
val seconds: TimeUnit = java.util.concurrent.TimeUnit.SECONDS
val delay: IO[Unit] = IO.sleep(FiniteDuration(1L, seconds))
delay.unsafeRunSync()
out + next
}
def timeIO[A](op: IO[A]): IO[Result[A]] = for {
start <- IO(System.nanoTime / 1e9)
out <- op
stop = System.nanoTime / 1e9
} yield Result(out, stop - start)
def addInSequence(first: Seq[Int], second: Seq[Int], third: Seq[Int]): IO[Result[Int]] = {
timeIO(IO(List(first, second, third).map(ns => slowAdd(ns)).sum))
}
def addInParallel(first: Seq[Int], second: Seq[Int], third: Seq[Int]): IO[Result[Int]] = {
// I changed is as little as possible so that you would still see
// similarity to your code, but normally I would write
// .parTraverse(f) instead of .map(f).parSequence
timeIO(List(first, second, third).map(ns => IO(slowAdd(ns))).parSequence.map(_.sum))
}
def run(args: List[String]): IO[ExitCode] = {
val nums: Seq[Int] = 1 to 4
val results: IO[Seq[(String, Result[Int])]] = for {
serial <- addInSequence(nums, nums, nums)
parallel <- addInParallel(nums, nums, nums)
} yield Seq(("Serial", serial), ("Parallel", parallel))
val report: IO[Unit] = results.map(println)
report.unsafeRunSync()
IO(ExitCode.Success)
}
}
衡量我认为你想要衡量的东西我收到这个结果:
@ ParallelMap.main(Array[String]())
List((Serial,Result(30,12.006349742005114)), (Parallel,Result(30,4.003020468982868)))
这表明并行计算的速度是顺序计算的 3 倍。