嵌套 Futures 导致性能更差
Nested Futures lead to worse performance
我有一组任务。除了少数大任务外,每个这样的任务通常都相当小。最初我每个任务有一个 Future
。然而,这导致我不得不等待一些 Futures
有很多 CPU 处于空闲状态的更大任务。我想通过检查任务是否超过特定大小来改变这一点,如果是,则再次将任务拆分为多个 Futures
解决子任务。然而,这会导致性能变差,因为第一组期货突然按顺序执行。 None 的任务或子任务是相关的,因此可以相互独立解决。
以下是此行为的概念证明。在 scala 2.13.4 中测试。
import scala.concurrent.{Await, Future, blocking}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.sys.process._
object Main {
val workSet : List[Int] = List(
3000,
3000,
3000,
12000,
3000,
3000,
3000,
12000,
3000,
3000,
3000,
3000,
3000,
)
def stupidWait(wait : Int) : Int = {
println(s"waiting for $wait")
val start = System.currentTimeMillis()
while ( (start + wait) > System.currentTimeMillis()) {}
1
}
def smartWait(wait : Int) : Int = {
val work = if(wait > 3000) {
List(3000,3000,3000,3000)
} else {
List(3000)
}
println(s"smart working with ${work}")
val future = Future.sequence {
work.map {
wait => Future { stupidWait(wait) }
}
}
Await.ready(future, Inf)
1
}
def time() : String = {
"date".!!
}
def main(args : Array[String]) : Unit = {
println(s"${time()} dumb start")
val futureI = Future.sequence {
workSet.map {
wait => Future { stupidWait(wait) }
}
}
Await.ready(futureI,Inf)
println(s"${time()} dumb end")
println(s"${time()} smart start")
val futureII = Future.sequence {
workSet.map {
wait => Future { smartWait(wait) }
}
}
Await.ready(futureII,Inf)
println(s"${time()} smart end")
}
}
输出为:
Thu Mar 4 08:09:42 PM CET 2021
dumb start
...
Thu Mar 4 08:09:54 PM CET 2021
dumb end
Thu Mar 4 08:09:54 PM CET 2021
smart start
...
Thu Mar 4 08:10:39 PM CET 2021
smart end
我原以为智能等待至少会一样快,甚至更快,因为较长的等待时间现在分为 4 个可能并行的等待时间。然而事实并非如此。
为什么 smartWait 没有更快,我必须如何更改代码才能使 smartWait 按预期工作?
执行器可用的线程数受限于您默认拥有的核心数(通常 运行 并行的事情多于您执行它们的核心数是没有意义的)。
我想,你可能有 8 个核心。因此,前 8 个等待立即开始,其他 5 个等待。然后 3 秒后,6 个线程完成了他们的任务,并接管剩下的 5 个。因此,再过 3 秒后,除了两个更大的任务外,其他所有任务都完成了,这两个任务继续旋转了 6 秒。
现在,“(显然不是)智能等待”会怎样?
我在您的输出中添加了自开始以来的秒数和线程名称,以便更轻松地跟踪正在发生的事情:
0: scala-execution-context-global-16: smart working with List(3000)
0: scala-execution-context-global-18: smart working with List(3000)
0: scala-execution-context-global-21: smart working with List(3000, 3000, 3000, 3000)
0: scala-execution-context-global-19: smart working with List(3000)
0: scala-execution-context-global-17: smart working with List(3000, 3000, 3000, 3000)
0: scala-execution-context-global-14: smart working with List(3000)
0: scala-execution-context-global-20: smart working with List(3000)
0: scala-execution-context-global-15: smart working with List(3000)
0: scala-execution-context-global-22: waiting for 3000
3: scala-execution-context-global-22: waiting for 3000
3: scala-execution-context-global-20: waiting for 3000
6: scala-execution-context-global-22: waiting for 3000
6: scala-execution-context-global-20: waiting for 3000
9: scala-execution-context-global-20: waiting for 3000
9: scala-execution-context-global-22: waiting for 3000
12:
12: scala-execution-context-global-22: waiting for 3000
15: scala-execution-context-global-20: waiting for 3000
15: scala-execution-context-global-17: smart working with List(3000)
15: scala-execution-context-global-21: waiting for 3000
15: scala-execution-context-global-22: waiting for 3000
18: scala-execution-context-global-20: waiting for 3000
18: scala-execution-context-global-19: smart working with List(3000)
18: scala-execution-context-global-16: waiting for 3000
18: scala-execution-context-global-21: waiting for 3000
18: scala-execution-context-global-22: smart working with List(3000)
18: scala-execution-context-global-18: smart working with List(3000)
21: scala-execution-context-global-17: waiting for 3000
21: scala-execution-context-global-20: waiting for 3000
21: scala-execution-context-global-16: smart working with List(3000)
21: scala-execution-context-global-15: waiting for 3000
21: scala-execution-context-global-21: waiting for 3000
看看,8个“智能”任务是如何先启动的。它们都被阻塞了,因为没有线程留在池中来执行内部期货。
然后添加线程 #22(第一列中带有 0
的最后一行)。
注意:ForkJoinPool
实现中有一些非常聪明和复杂的逻辑,用于检测池中的所有线程都被阻塞等待一个条件,并在发生这种情况时启动一个额外的(另一个答案中提到的 blocking
用于帮助。遵循该建议将使该测试在大约 3 秒内完成......但是当你需要实际的内核)。它并不 总是 工作,并且不适用于所有线程池实现。如果您使用了默认执行程序以外的其他执行程序(或以某种不同的方式被阻止),那么整个过程很可能会在此时被锁定。
因此,它检测到死锁,并启动线程 22 来解决它。这个新线程从队列中选取一个已提交的期货,并将其 运行 发送三秒钟。这释放了线程 22 和线程 20,因此执行了另外两个任务(在 3 秒标记处)。这还需要 3 秒。看起来获得 运行 的任务属于较大的项目之一,因此没有其他线程被释放,我们仍然只有 20 和 22,其他所有东西都被阻塞了。所以他们选择了另外两个任务 运行 3 秒,依此类推。
请注意,到 12 秒标记时,实际上只有 7 个任务 运行,这大约是列表的 1/3。
您可以像这样继续跟踪输出以查看事情进展情况。
使用 futures 的一个相当普遍的经验法则是永远不要阻塞活动线程。它实际上非常危险,并且可能会锁定整个过程(正如我上面所说,我们在这里没有得到它已经很幸运了)。
在未来外部进行分裂,这样你就不必阻塞等待内部未来到return可能是最简单和最安全的解决方案在你的情况下:
Future.traverse(
workSet.flatMap {
case 3000 => Seq(3000)
case => Seq.fill(4)(3000)
}
) { n => Future(stupidWait(n)) }
或者,为了尽量减少对原始代码的更改,只需将 smartWait
设置为 return 未来而不是等待它,然后在 main
中去掉外部 Future
然后做 Future.traverse(workSet)(smartWait)
。
这应该在大约 6 秒内完成。
尝试:
def stupidWait(wait : Int) : Int = blocking {
println(s"waiting for $wait")
val start = System.currentTimeMillis()
while ( (start + wait) > System.currentTimeMillis()) {}
1
}
Futures 并未针对最大可能的并行性进行优化(因为这通常意味着由于上下文切换和调度程序开销而导致的性能较差。),而是针对最大吞吐量(使用更少的线程)进行优化,并且因为您正在做的事情没有意义(你本质上只是将电子转化为热量),你应该告诉 ExecutionContext 你在语义上阻塞,这允许它创建更多的线程来保持并行性。
一个更好的解决方案,也不会假设底层线程池的功能有任何问题,是将耗时的工作分成离散的块,然后将它们重新提交给 ExecutionContext。这既可以处理其他工作,又可以将对代码本身的影响降到最低。
我有一组任务。除了少数大任务外,每个这样的任务通常都相当小。最初我每个任务有一个 Future
。然而,这导致我不得不等待一些 Futures
有很多 CPU 处于空闲状态的更大任务。我想通过检查任务是否超过特定大小来改变这一点,如果是,则再次将任务拆分为多个 Futures
解决子任务。然而,这会导致性能变差,因为第一组期货突然按顺序执行。 None 的任务或子任务是相关的,因此可以相互独立解决。
以下是此行为的概念证明。在 scala 2.13.4 中测试。
import scala.concurrent.{Await, Future, blocking}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.sys.process._
object Main {
val workSet : List[Int] = List(
3000,
3000,
3000,
12000,
3000,
3000,
3000,
12000,
3000,
3000,
3000,
3000,
3000,
)
def stupidWait(wait : Int) : Int = {
println(s"waiting for $wait")
val start = System.currentTimeMillis()
while ( (start + wait) > System.currentTimeMillis()) {}
1
}
def smartWait(wait : Int) : Int = {
val work = if(wait > 3000) {
List(3000,3000,3000,3000)
} else {
List(3000)
}
println(s"smart working with ${work}")
val future = Future.sequence {
work.map {
wait => Future { stupidWait(wait) }
}
}
Await.ready(future, Inf)
1
}
def time() : String = {
"date".!!
}
def main(args : Array[String]) : Unit = {
println(s"${time()} dumb start")
val futureI = Future.sequence {
workSet.map {
wait => Future { stupidWait(wait) }
}
}
Await.ready(futureI,Inf)
println(s"${time()} dumb end")
println(s"${time()} smart start")
val futureII = Future.sequence {
workSet.map {
wait => Future { smartWait(wait) }
}
}
Await.ready(futureII,Inf)
println(s"${time()} smart end")
}
}
输出为:
Thu Mar 4 08:09:42 PM CET 2021
dumb start
...
Thu Mar 4 08:09:54 PM CET 2021
dumb end
Thu Mar 4 08:09:54 PM CET 2021
smart start
...
Thu Mar 4 08:10:39 PM CET 2021
smart end
我原以为智能等待至少会一样快,甚至更快,因为较长的等待时间现在分为 4 个可能并行的等待时间。然而事实并非如此。
为什么 smartWait 没有更快,我必须如何更改代码才能使 smartWait 按预期工作?
执行器可用的线程数受限于您默认拥有的核心数(通常 运行 并行的事情多于您执行它们的核心数是没有意义的)。
我想,你可能有 8 个核心。因此,前 8 个等待立即开始,其他 5 个等待。然后 3 秒后,6 个线程完成了他们的任务,并接管剩下的 5 个。因此,再过 3 秒后,除了两个更大的任务外,其他所有任务都完成了,这两个任务继续旋转了 6 秒。
现在,“(显然不是)智能等待”会怎样?
我在您的输出中添加了自开始以来的秒数和线程名称,以便更轻松地跟踪正在发生的事情:
0: scala-execution-context-global-16: smart working with List(3000)
0: scala-execution-context-global-18: smart working with List(3000)
0: scala-execution-context-global-21: smart working with List(3000, 3000, 3000, 3000)
0: scala-execution-context-global-19: smart working with List(3000)
0: scala-execution-context-global-17: smart working with List(3000, 3000, 3000, 3000)
0: scala-execution-context-global-14: smart working with List(3000)
0: scala-execution-context-global-20: smart working with List(3000)
0: scala-execution-context-global-15: smart working with List(3000)
0: scala-execution-context-global-22: waiting for 3000
3: scala-execution-context-global-22: waiting for 3000
3: scala-execution-context-global-20: waiting for 3000
6: scala-execution-context-global-22: waiting for 3000
6: scala-execution-context-global-20: waiting for 3000
9: scala-execution-context-global-20: waiting for 3000
9: scala-execution-context-global-22: waiting for 3000
12:
12: scala-execution-context-global-22: waiting for 3000
15: scala-execution-context-global-20: waiting for 3000
15: scala-execution-context-global-17: smart working with List(3000)
15: scala-execution-context-global-21: waiting for 3000
15: scala-execution-context-global-22: waiting for 3000
18: scala-execution-context-global-20: waiting for 3000
18: scala-execution-context-global-19: smart working with List(3000)
18: scala-execution-context-global-16: waiting for 3000
18: scala-execution-context-global-21: waiting for 3000
18: scala-execution-context-global-22: smart working with List(3000)
18: scala-execution-context-global-18: smart working with List(3000)
21: scala-execution-context-global-17: waiting for 3000
21: scala-execution-context-global-20: waiting for 3000
21: scala-execution-context-global-16: smart working with List(3000)
21: scala-execution-context-global-15: waiting for 3000
21: scala-execution-context-global-21: waiting for 3000
看看,8个“智能”任务是如何先启动的。它们都被阻塞了,因为没有线程留在池中来执行内部期货。
然后添加线程 #22(第一列中带有 0
的最后一行)。
注意:ForkJoinPool
实现中有一些非常聪明和复杂的逻辑,用于检测池中的所有线程都被阻塞等待一个条件,并在发生这种情况时启动一个额外的(另一个答案中提到的 blocking
用于帮助。遵循该建议将使该测试在大约 3 秒内完成......但是当你需要实际的内核)。它并不 总是 工作,并且不适用于所有线程池实现。如果您使用了默认执行程序以外的其他执行程序(或以某种不同的方式被阻止),那么整个过程很可能会在此时被锁定。
因此,它检测到死锁,并启动线程 22 来解决它。这个新线程从队列中选取一个已提交的期货,并将其 运行 发送三秒钟。这释放了线程 22 和线程 20,因此执行了另外两个任务(在 3 秒标记处)。这还需要 3 秒。看起来获得 运行 的任务属于较大的项目之一,因此没有其他线程被释放,我们仍然只有 20 和 22,其他所有东西都被阻塞了。所以他们选择了另外两个任务 运行 3 秒,依此类推。
请注意,到 12 秒标记时,实际上只有 7 个任务 运行,这大约是列表的 1/3。
您可以像这样继续跟踪输出以查看事情进展情况。
使用 futures 的一个相当普遍的经验法则是永远不要阻塞活动线程。它实际上非常危险,并且可能会锁定整个过程(正如我上面所说,我们在这里没有得到它已经很幸运了)。
在未来外部进行分裂,这样你就不必阻塞等待内部未来到return可能是最简单和最安全的解决方案在你的情况下:
Future.traverse(
workSet.flatMap {
case 3000 => Seq(3000)
case => Seq.fill(4)(3000)
}
) { n => Future(stupidWait(n)) }
或者,为了尽量减少对原始代码的更改,只需将 smartWait
设置为 return 未来而不是等待它,然后在 main
中去掉外部 Future
然后做 Future.traverse(workSet)(smartWait)
。
这应该在大约 6 秒内完成。
尝试:
def stupidWait(wait : Int) : Int = blocking {
println(s"waiting for $wait")
val start = System.currentTimeMillis()
while ( (start + wait) > System.currentTimeMillis()) {}
1
}
Futures 并未针对最大可能的并行性进行优化(因为这通常意味着由于上下文切换和调度程序开销而导致的性能较差。),而是针对最大吞吐量(使用更少的线程)进行优化,并且因为您正在做的事情没有意义(你本质上只是将电子转化为热量),你应该告诉 ExecutionContext 你在语义上阻塞,这允许它创建更多的线程来保持并行性。
一个更好的解决方案,也不会假设底层线程池的功能有任何问题,是将耗时的工作分成离散的块,然后将它们重新提交给 ExecutionContext。这既可以处理其他工作,又可以将对代码本身的影响降到最低。