Actor 中的阻塞操作不占用所有默认调度程序

Blocking Operation in Actor NOT Occupying All Default Dispatchers

最近在学习Akka Actor。我阅读了 Actor 中调度员的文档。我很好奇演员中的阻塞操作。文档最后的topic描述了如何解决这个问题。我正在尝试重现文档中的示例实验。

这是我的代码:

package dispatcher

import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory

object Main extends App{

  var config = ConfigFactory.parseString(
    """
      |my-dispatcher{
      |type = Dispatcher
      |
      |executor = "fork-join-executor"
      |
      |fork-join-executor{
      |fixed-pool-size = 32
      |}
      |throughput = 1
      |}
    """.stripMargin)

//  val system = ActorSystem("block", ConfigFactory.load("/Users/jiexray/IdeaProjects/ActorDemo/application.conf"))


  val system = ActorSystem("block")


  val actor1 = system.actorOf(Props(new BlockingFutureActor()))
  val actor2 = system.actorOf(Props(new PrintActor()))

  for(i <- 1 to 1000){
    actor1 ! i
    actor2 ! i
  }

}

package dispatcher

import akka.actor.Actor

import scala.concurrent.{ExecutionContext, Future}

class BlockingFutureActor extends Actor{
  override def receive: Receive = {
    case i: Int =>
      Thread.sleep(5000)
      implicit val excutionContext: ExecutionContext = context.dispatcher
      Future {
        Thread.sleep(5000)
        println(s"Blocking future finished ${i}")
      }
  }
}
package dispatcher

import akka.actor.Actor

class PrintActor extends Actor{
  override def receive: Receive = {
    case i: Int =>
      println(s"PrintActor: ${i}")
  }
}

我只是用默认调度程序创建了一个 ActorSystem,所有参与者都依赖于它们。 BlockingFutureActor 有一个封装在 Future 中的阻塞操作。 PrintActor 只是立即打印一个数字。

在文档的解释中,默认调度器会被BlockingFutureActor中的Future个占用,从而导致PrintActor的消息阻塞。应用程序卡在某个地方,例如:

> PrintActor: 44
> PrintActor: 45

不幸的是,我的代码没有被阻止。 PrintActor 的所有输出都顺利显示。但是 BlockingFutureActor 的输出就像挤牙膏一样。我尝试通过 Intellij 的调试监控我的线程信息,我得到:

您可能会发现只有两个调度员在睡觉(BlockingFutureActor 使这种情况发生)。其他人正在等待,这意味着他们可以发送新消息。

我已经阅读了关于 Actor(page) 中阻塞操作的回答。引用 "Dispatchers are, effectively, thread-pools. Separating the two guarantees that the slow, blocking operations don't starve the other. This approach, in general, is referred to as bulk-heading, because the idea is that if a part of the app fails, the rest remains responsive."

默认调度器是否会保留一些调度器用于阻塞操作?这样即使有这么多阻塞操作请求调度程序,系统也可以处理消息。

Akka文档中的实验可以复现吗?是不是我配置有问题

感谢您的建议。最良好的祝愿。

您在 BlockingFutureActor 的任何打印语句之前看到 PrintActor 的所有 1000 条打印语句的原因是因为 BlockingFutureActor 中的第一个 Thread.sleep 调用s receive 块。这个Thread.sleep是你的代码和官方文档中例子的关键区别:

override def receive: Receive = {
  case i: Int =>
    Thread.sleep(5000) // <----- this call is not in the example in the official docs
    implicit val excutionContext: ExecutionContext = context.dispatcher
    Future {
      ...
    }
}

请记住,参与者一次处理一条消息。 Thread.sleep(5000) 基本上模拟了至少需要五秒钟来处理的消息。 BlockingFutureActor 在处理完当前消息之前不会处理另一条消息,即使它的邮箱中有数百条消息也是如此。当 BlockingFutureActor 正在处理第一条 Int 条值为 1 的消息时,PrintActor 已经处理完发送给它的所有 1000 条消息。为了使这一点更清楚,让我们添加一个 println 语句:

override def receive: Receive = {
  case i: Int =>
    println(s"Entering BlockingFutureActor's receive: $i") // <-----
    Thread.sleep(5000)
    implicit val excutionContext: ExecutionContext = context.dispatcher
    Future {
      ...
    }
}

当我们 运行 程序时的示例输出:

Entering BlockingFutureActor's receive: 1
PrintActor: 1
PrintActor: 2
PrintActor: 3
...
PrintActor: 1000
Entering BlockingFutureActor's receive: 2
Entering BlockingFutureActor's receive: 3
Blocking future finished 1
...

如您所见,当 BlockingFutureActor 实际开始处理消息 2 时,PrintActor 已经处理完所有 1000 条消息。

如果您首先删除 Thread.sleep,那么您会看到邮件从 BlockingFutureActor 的邮箱中更快地出列,因为工作正在 "delegated" 到 Future。创建 Future 后,actor 会从其邮箱中获取下一条消息,而无需等待 Future 完成。下面是没有第一个 Thread.sleep 的示例输出(每次 运行 它都不会完全相同):

Entering BlockingFutureActor's receive: 1
PrintActor: 1
PrintActor: 2
...
PrintActor: 84
PrintActor: 85
Entering BlockingFutureActor's receive: 2
Entering BlockingFutureActor's receive: 3
Entering BlockingFutureActor's receive: 4
Entering BlockingFutureActor's receive: 5
PrintActor: 86
PrintActor: 87
...