为什么在 Akka Dispatcher 上启动时 Futures 在 Futures 运行 中顺序

Why are Futures within Futures running sequentially when started on Akka Dispatcher

当我们试图从 actor 的 receive 方法中启动多个 futures 时,我们观察到一个奇怪的行为。 如果我们使用我们配置的调度程序作为 ExecutionContext,则 futures 运行 在同一个线程上并按顺序。如果我们使用 ExecutionContext.Implicits.global,期货 运行 按预期并行。

我们将代码归结为以下示例(下面是一个更完整的示例):

implicit val ec = context.getDispatcher

Future{ doWork() } // <-- all running parallel
Future{ doWork() }
Future{ doWork() }
Future{ doWork() }

Future {
   Future{ doWork() } 
   Future{ doWork() } // <-- NOT RUNNING PARALLEL!!! WHY!!!
   Future{ doWork() }
   Future{ doWork() }
}

一个可编译的例子是这样的:

import akka.actor.ActorSystem
import scala.concurrent.{ExecutionContext, Future}

object WhyNotParallelExperiment extends App {

  val actorSystem = ActorSystem(s"Experimental")   

  // Futures not started in future: running in parallel
  startFutures(runInFuture = false)(actorSystem.dispatcher)
  Thread.sleep(5000)

  // Futures started in future: running in sequentially. Why????
  startFutures(runInFuture = true)(actorSystem.dispatcher)
  Thread.sleep(5000)

  actorSystem.terminate()

  private def startFutures(runInFuture: Boolean)(implicit executionContext: ExecutionContext): Unit = {
    if (runInFuture) {
      Future{
        println(s"Start Futures on thread ${Thread.currentThread().getName()}")
        (1 to 9).foreach(startFuture)
        println(s"Started Futures on thread ${Thread.currentThread().getName()}")
      }
    } else {
      (11 to 19).foreach(startFuture)
    }
  }

  private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future{
    println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
    Thread.sleep(500)
    println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
  }


}

我们尝试了 thread-pool-executor 和 fork-join-executor,结果相同。

我们是否以错误的方式使用期货? 那么您应该如何生成并行任务?

这与调度程序的 "throughput" 设置有关。我在 application.conf 中添加了一个 "fair-dispatcher" 来证明这一点:

fair-dispatcher {
  # Dispatcher is the name of the event-based dispatcher
  type = Dispatcher
  # What kind of ExecutionService to use
  executor = "fork-join-executor"
  # Configuration for the fork join pool
  fork-join-executor {
    # Min number of threads to cap factor-based parallelism number to
    parallelism-min = 2
    # Parallelism (threads) ... ceil(available processors * factor)
    parallelism-factor = 2.0
    # Max number of threads to cap factor-based parallelism number to
    parallelism-max = 10
  }
  # Throughput defines the maximum number of messages to be
  # processed per actor before the thread jumps to the next actor.
  # Set to 1 for as fair as possible.
  throughput = 1
}

这是您的示例,经过一些修改以对 Futures 使用公平调度程序并打印吞吐量设置的当前值:

package com.test

import akka.actor.ActorSystem

import scala.concurrent.{ExecutionContext, Future}

object WhyNotParallelExperiment extends App {

  val actorSystem = ActorSystem(s"Experimental")

  println("Default dispatcher throughput:")
  println(actorSystem.dispatchers.defaultDispatcherConfig.getInt("throughput"))

  println("Fair dispatcher throughput:")
  println(actorSystem.dispatchers.lookup("fair-dispatcher").configurator.config.getInt("throughput"))

  // Futures not started in future: running in parallel
  startFutures(runInFuture = false)(actorSystem.dispatcher)
  Thread.sleep(5000)

  // Futures started in future: running in sequentially. Why????
  startFutures(runInFuture = true)(actorSystem.dispatcher)
  Thread.sleep(5000)

  actorSystem.terminate()

  private def startFutures(runInFuture: Boolean)(implicit executionContext: ExecutionContext): Unit = {
    if (runInFuture) {
      Future{
        implicit val fairExecutionContext = actorSystem.dispatchers.lookup("fair-dispatcher")
        println(s"Start Futures on thread ${Thread.currentThread().getName()}")
        (1 to 9).foreach(i => startFuture(i)(fairExecutionContext))
        println(s"Started Futures on thread ${Thread.currentThread().getName()}")
      }
    } else {
      (11 to 19).foreach(startFuture)
    }
  }

  private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future{
    println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
    Thread.sleep(500)
    println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
  }
}

输出:

Default dispatcher throughput:
5
Fair dispatcher throughput:
1
Future 12 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-3
Future 11 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-4
Future 13 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-2
Future 14 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-5
Future 16 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-7
Future 15 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-6
Future 17 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-8
Future 18 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-9
Future 19 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-10
Future 13 finished on thread Experimental-akka.actor.default-dispatcher-2
Future 11 finished on thread Experimental-akka.actor.default-dispatcher-4
Future 12 finished on thread Experimental-akka.actor.default-dispatcher-3
Future 14 finished on thread Experimental-akka.actor.default-dispatcher-5
Future 16 finished on thread Experimental-akka.actor.default-dispatcher-7
Future 15 finished on thread Experimental-akka.actor.default-dispatcher-6
Future 17 finished on thread Experimental-akka.actor.default-dispatcher-8
Future 18 finished on thread Experimental-akka.actor.default-dispatcher-9
Future 19 finished on thread Experimental-akka.actor.default-dispatcher-10
Start Futures on thread Experimental-akka.actor.default-dispatcher-10
Future 1 should run for 500 millis on thread Experimental-fair-dispatcher-12
Future 2 should run for 500 millis on thread Experimental-fair-dispatcher-13
Future 4 should run for 500 millis on thread Experimental-fair-dispatcher-15
Future 3 should run for 500 millis on thread Experimental-fair-dispatcher-14
Future 5 should run for 500 millis on thread Experimental-fair-dispatcher-17
Future 6 should run for 500 millis on thread Experimental-fair-dispatcher-16
Future 7 should run for 500 millis on thread Experimental-fair-dispatcher-18
Future 8 should run for 500 millis on thread Experimental-fair-dispatcher-19
Started Futures on thread Experimental-akka.actor.default-dispatcher-10
Future 4 finished on thread Experimental-fair-dispatcher-15
Future 2 finished on thread Experimental-fair-dispatcher-13
Future 1 finished on thread Experimental-fair-dispatcher-12
Future 9 should run for 500 millis on thread Experimental-fair-dispatcher-15
Future 5 finished on thread Experimental-fair-dispatcher-17
Future 7 finished on thread Experimental-fair-dispatcher-18
Future 8 finished on thread Experimental-fair-dispatcher-19
Future 6 finished on thread Experimental-fair-dispatcher-16
Future 3 finished on thread Experimental-fair-dispatcher-14
Future 9 finished on thread Experimental-fair-dispatcher-15

如您所见,fair-dispatcher 对大多数期货使用不同的线程。

默认调度程序针对参与者进行了优化,因此吞吐量设置为 5 以最大限度地减少上下文切换,从而提高消息处理吞吐量,同时保持一定程度的公平性。

我的 fair-dispatcher 中唯一的变化是吞吐量:1,即如果可能,每个异步执行请求都有自己的线程(最多 parallelism-max)。

我建议为用于不同目的的期货创建单独的调度程序。例如。一个调度程序(即线程池)用于调用某些 Web 服务,另一个用于阻止数据库访问等。这可以通过调整自定义调度程序设置来更精确地控制它。

看看https://doc.akka.io/docs/akka/current/dispatchers.html,对于了解细节很有用。

同时查看 Akka 参考设置(特别是 default-dispatcher),那里有很多有用的评论:https://github.com/akka/akka/blob/master/akka-actor/src/main/resources/reference.conf

经过一些研究,我发现 Dispatcher class 实现了 akka.dispatch.BatchingExecutor。出于性能原因,此 class 检查应在同一线程上批处理哪些任务。 Future.map 在内部创建了一个 scala.concurrent.OnCompleteRunnable,它在 BatchingExecutor.

中被批处理

这对于 map() / flatMap() 来说似乎是合理的,其中一项任务生成一项后续任务,但对于用于分叉工作的显式新 Futures 则不然。 在内部,Future.applyFuture.successful().map 实现,因此是批处理的。我现在的解决方法是以不同的方式创建期货:

object MyFuture {
  def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = {
    val promise = Promise[T]()
    class FuturesStarter extends Runnable {
      override def run(): Unit = {
        promise.complete(Try(body))
      }
    }
    executor.execute(new FuturesStarter)
    promise.future
  }
}

FutureStarter-Runnables 不是批处理的,因此 运行 是并行的。

任何人都可以确认这个解决方案没问题吗? 有没有更好的方法来解决这个问题? Future / BatchingExecutor 的当前实现是需要的,还是一个错误?

来自Akka内部的描述BatchingExecutor(强调我的):

Mixin trait for an Executor which groups multiple nested Runnable.run() calls into a single Runnable passed to the original Executor. This can be a useful optimization because it bypasses the original context's task queue and keeps related (nested) code on a single thread which may improve CPU affinity. However, if tasks passed to the Executor are blocking or expensive, this optimization can prevent work-stealing and make performance worse....A batching executor can create deadlocks if code does not use scala.concurrent.blocking when it should, because tasks created within other tasks will block on the outer task completing.

如果您使用的调度程序混合在 BatchingExecutor 中——即 MessageDispatcher 的子类——您可以使用 scala.concurrent.blocking 构造来启用与嵌套 Futures 的并行性:

Future {
  Future {
    blocking {
      doBlockingWork()
    }
  }
}

在您的示例中,您将在 startFuture 方法中添加 blocking

private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future {
  blocking {
    println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
    Thread.sleep(500)
    println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
  }
}

来自 运行 startFutures(true)(actorSystem.dispatcher) 的示例输出并进行了上述更改:

Start Futures on thread Experimental-akka.actor.default-dispatcher-2
Started Futures on thread Experimental-akka.actor.default-dispatcher-2
Future 1 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-2
Future 3 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-3
Future 5 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-6
Future 7 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-7
Future 4 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-5
Future 9 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-10
Future 6 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-8
Future 8 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-9
Future 2 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-4
Future 1 finished on thread Experimental-akka.actor.default-dispatcher-2
Future 3 finished on thread Experimental-akka.actor.default-dispatcher-3
Future 5 finished on thread Experimental-akka.actor.default-dispatcher-6
Future 4 finished on thread Experimental-akka.actor.default-dispatcher-5
Future 8 finished on thread Experimental-akka.actor.default-dispatcher-9
Future 7 finished on thread Experimental-akka.actor.default-dispatcher-7
Future 9 finished on thread Experimental-akka.actor.default-dispatcher-10
Future 6 finished on thread Experimental-akka.actor.default-dispatcher-8
Future 2 finished on thread Experimental-akka.actor.default-dispatcher-4