Actor 中的阻塞操作不占用所有默认调度程序
Blocking Operation in Actor NOT Occupying All Default Dispatchers
最近在学习Akka Actor。我阅读了 Actor 中调度员的文档。我很好奇演员中的阻塞操作。文档最后的topic描述了如何解决这个问题。我正在尝试重现文档中的示例实验。
这是我的代码:
package dispatcher
import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory
object Main extends App{
var config = ConfigFactory.parseString(
"""
|my-dispatcher{
|type = Dispatcher
|
|executor = "fork-join-executor"
|
|fork-join-executor{
|fixed-pool-size = 32
|}
|throughput = 1
|}
""".stripMargin)
// val system = ActorSystem("block", ConfigFactory.load("/Users/jiexray/IdeaProjects/ActorDemo/application.conf"))
val system = ActorSystem("block")
val actor1 = system.actorOf(Props(new BlockingFutureActor()))
val actor2 = system.actorOf(Props(new PrintActor()))
for(i <- 1 to 1000){
actor1 ! i
actor2 ! i
}
}
package dispatcher
import akka.actor.Actor
import scala.concurrent.{ExecutionContext, Future}
class BlockingFutureActor extends Actor{
override def receive: Receive = {
case i: Int =>
Thread.sleep(5000)
implicit val excutionContext: ExecutionContext = context.dispatcher
Future {
Thread.sleep(5000)
println(s"Blocking future finished ${i}")
}
}
}
package dispatcher
import akka.actor.Actor
class PrintActor extends Actor{
override def receive: Receive = {
case i: Int =>
println(s"PrintActor: ${i}")
}
}
我只是用默认调度程序创建了一个 ActorSystem
,所有参与者都依赖于它们。 BlockingFutureActor
有一个封装在 Future
中的阻塞操作。 PrintActor
只是立即打印一个数字。
在文档的解释中,默认调度器会被BlockingFutureActor
中的Future
个占用,从而导致PrintActor
的消息阻塞。应用程序卡在某个地方,例如:
> PrintActor: 44
> PrintActor: 45
不幸的是,我的代码没有被阻止。 PrintActor
的所有输出都顺利显示。但是 BlockingFutureActor
的输出就像挤牙膏一样。我尝试通过 Intellij 的调试监控我的线程信息,我得到:
您可能会发现只有两个调度员在睡觉(BlockingFutureActor
使这种情况发生)。其他人正在等待,这意味着他们可以发送新消息。
我已经阅读了关于 Actor(page) 中阻塞操作的回答。引用 "Dispatchers are, effectively, thread-pools. Separating the two guarantees that the slow, blocking operations don't starve the other. This approach, in general, is referred to as bulk-heading, because the idea is that if a part of the app fails, the rest remains responsive."
默认调度器是否会保留一些调度器用于阻塞操作?这样即使有这么多阻塞操作请求调度程序,系统也可以处理消息。
Akka文档中的实验可以复现吗?是不是我配置有问题
感谢您的建议。最良好的祝愿。
您在 BlockingFutureActor
的任何打印语句之前看到 PrintActor
的所有 1000 条打印语句的原因是因为 BlockingFutureActor
中的第一个 Thread.sleep
调用s receive
块。这个Thread.sleep
是你的代码和官方文档中例子的关键区别:
override def receive: Receive = {
case i: Int =>
Thread.sleep(5000) // <----- this call is not in the example in the official docs
implicit val excutionContext: ExecutionContext = context.dispatcher
Future {
...
}
}
请记住,参与者一次处理一条消息。 Thread.sleep(5000)
基本上模拟了至少需要五秒钟来处理的消息。 BlockingFutureActor
在处理完当前消息之前不会处理另一条消息,即使它的邮箱中有数百条消息也是如此。当 BlockingFutureActor
正在处理第一条 Int
条值为 1
的消息时,PrintActor
已经处理完发送给它的所有 1000 条消息。为了使这一点更清楚,让我们添加一个 println
语句:
override def receive: Receive = {
case i: Int =>
println(s"Entering BlockingFutureActor's receive: $i") // <-----
Thread.sleep(5000)
implicit val excutionContext: ExecutionContext = context.dispatcher
Future {
...
}
}
当我们 运行 程序时的示例输出:
Entering BlockingFutureActor's receive: 1
PrintActor: 1
PrintActor: 2
PrintActor: 3
...
PrintActor: 1000
Entering BlockingFutureActor's receive: 2
Entering BlockingFutureActor's receive: 3
Blocking future finished 1
...
如您所见,当 BlockingFutureActor
实际开始处理消息 2
时,PrintActor
已经处理完所有 1000 条消息。
如果您首先删除 Thread.sleep
,那么您会看到邮件从 BlockingFutureActor
的邮箱中更快地出列,因为工作正在 "delegated" 到 Future
。创建 Future
后,actor 会从其邮箱中获取下一条消息,而无需等待 Future
完成。下面是没有第一个 Thread.sleep
的示例输出(每次 运行 它都不会完全相同):
Entering BlockingFutureActor's receive: 1
PrintActor: 1
PrintActor: 2
...
PrintActor: 84
PrintActor: 85
Entering BlockingFutureActor's receive: 2
Entering BlockingFutureActor's receive: 3
Entering BlockingFutureActor's receive: 4
Entering BlockingFutureActor's receive: 5
PrintActor: 86
PrintActor: 87
...
最近在学习Akka Actor。我阅读了 Actor 中调度员的文档。我很好奇演员中的阻塞操作。文档最后的topic描述了如何解决这个问题。我正在尝试重现文档中的示例实验。
这是我的代码:
package dispatcher
import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory
object Main extends App{
var config = ConfigFactory.parseString(
"""
|my-dispatcher{
|type = Dispatcher
|
|executor = "fork-join-executor"
|
|fork-join-executor{
|fixed-pool-size = 32
|}
|throughput = 1
|}
""".stripMargin)
// val system = ActorSystem("block", ConfigFactory.load("/Users/jiexray/IdeaProjects/ActorDemo/application.conf"))
val system = ActorSystem("block")
val actor1 = system.actorOf(Props(new BlockingFutureActor()))
val actor2 = system.actorOf(Props(new PrintActor()))
for(i <- 1 to 1000){
actor1 ! i
actor2 ! i
}
}
package dispatcher
import akka.actor.Actor
import scala.concurrent.{ExecutionContext, Future}
class BlockingFutureActor extends Actor{
override def receive: Receive = {
case i: Int =>
Thread.sleep(5000)
implicit val excutionContext: ExecutionContext = context.dispatcher
Future {
Thread.sleep(5000)
println(s"Blocking future finished ${i}")
}
}
}
package dispatcher
import akka.actor.Actor
class PrintActor extends Actor{
override def receive: Receive = {
case i: Int =>
println(s"PrintActor: ${i}")
}
}
我只是用默认调度程序创建了一个 ActorSystem
,所有参与者都依赖于它们。 BlockingFutureActor
有一个封装在 Future
中的阻塞操作。 PrintActor
只是立即打印一个数字。
在文档的解释中,默认调度器会被BlockingFutureActor
中的Future
个占用,从而导致PrintActor
的消息阻塞。应用程序卡在某个地方,例如:
> PrintActor: 44
> PrintActor: 45
不幸的是,我的代码没有被阻止。 PrintActor
的所有输出都顺利显示。但是 BlockingFutureActor
的输出就像挤牙膏一样。我尝试通过 Intellij 的调试监控我的线程信息,我得到:
您可能会发现只有两个调度员在睡觉(BlockingFutureActor
使这种情况发生)。其他人正在等待,这意味着他们可以发送新消息。
我已经阅读了关于 Actor(page) 中阻塞操作的回答。引用 "Dispatchers are, effectively, thread-pools. Separating the two guarantees that the slow, blocking operations don't starve the other. This approach, in general, is referred to as bulk-heading, because the idea is that if a part of the app fails, the rest remains responsive."
默认调度器是否会保留一些调度器用于阻塞操作?这样即使有这么多阻塞操作请求调度程序,系统也可以处理消息。
Akka文档中的实验可以复现吗?是不是我配置有问题
感谢您的建议。最良好的祝愿。
您在 BlockingFutureActor
的任何打印语句之前看到 PrintActor
的所有 1000 条打印语句的原因是因为 BlockingFutureActor
中的第一个 Thread.sleep
调用s receive
块。这个Thread.sleep
是你的代码和官方文档中例子的关键区别:
override def receive: Receive = {
case i: Int =>
Thread.sleep(5000) // <----- this call is not in the example in the official docs
implicit val excutionContext: ExecutionContext = context.dispatcher
Future {
...
}
}
请记住,参与者一次处理一条消息。 Thread.sleep(5000)
基本上模拟了至少需要五秒钟来处理的消息。 BlockingFutureActor
在处理完当前消息之前不会处理另一条消息,即使它的邮箱中有数百条消息也是如此。当 BlockingFutureActor
正在处理第一条 Int
条值为 1
的消息时,PrintActor
已经处理完发送给它的所有 1000 条消息。为了使这一点更清楚,让我们添加一个 println
语句:
override def receive: Receive = {
case i: Int =>
println(s"Entering BlockingFutureActor's receive: $i") // <-----
Thread.sleep(5000)
implicit val excutionContext: ExecutionContext = context.dispatcher
Future {
...
}
}
当我们 运行 程序时的示例输出:
Entering BlockingFutureActor's receive: 1
PrintActor: 1
PrintActor: 2
PrintActor: 3
...
PrintActor: 1000
Entering BlockingFutureActor's receive: 2
Entering BlockingFutureActor's receive: 3
Blocking future finished 1
...
如您所见,当 BlockingFutureActor
实际开始处理消息 2
时,PrintActor
已经处理完所有 1000 条消息。
如果您首先删除 Thread.sleep
,那么您会看到邮件从 BlockingFutureActor
的邮箱中更快地出列,因为工作正在 "delegated" 到 Future
。创建 Future
后,actor 会从其邮箱中获取下一条消息,而无需等待 Future
完成。下面是没有第一个 Thread.sleep
的示例输出(每次 运行 它都不会完全相同):
Entering BlockingFutureActor's receive: 1
PrintActor: 1
PrintActor: 2
...
PrintActor: 84
PrintActor: 85
Entering BlockingFutureActor's receive: 2
Entering BlockingFutureActor's receive: 3
Entering BlockingFutureActor's receive: 4
Entering BlockingFutureActor's receive: 5
PrintActor: 86
PrintActor: 87
...