Akka 广播请求不会传递给所有路由
Akka broadcast ask doesn't deliver to all routees
我正在学习在 Akka 路由器中使用广播消息。有没有办法从所有路由到路由器接收 ask
的响应?
我有这个示例代码。
Master.scala
object Master {
case object brdcst
}
class Master extends Actor {
implicit val timeout = Timeout(5 seconds)
val router: ActorRef = context.actorOf (RoundRobinPool (3).props(Props[Worker]), "router")
override def receive: Receive = {
case brdcst => {
val future = router ? Broadcast(brdcst)
val result = Await.result(future, timeout.duration)
println("result = " + result)
}
}
}
object MasterTest extends App {
val actorSystem = ActorSystem("ActorSystem")
val actor = actorSystem.actorOf(Props[master], "root")
actor ! brdcst
}
Worker.scala
class Worker extends Actor {
val routee = context.actorOf(Props[Worker], "routee")
override def receive: Receive = {
case brdcst => sender() ! self.path.name
}
}
此代码给出以下输出
result = $a
[INFO] [10/16/2018 21:47:07.484] [ActorSystem-akka.actor.default-dispatcher-2] [akka://ActorSystem/deadLetters] Message [java.lang.String] from Actor[akka://ActorSystem/user/root/router/$a#340358688] to Actor[akka://ActorSystem/deadLetters] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://ActorSystem/deadLetters]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [10/16/2018 21:47:07.504] [ActorSystem-akka.actor.default-dispatcher-10] [akka://ActorSystem/deadLetters] Message [java.lang.String] from Actor[akka://ActorSystem/user/root/router/$b#-151225340] to Actor[akka://ActorSystem/deadLetters] was not delivered. [2] dead letters encountered. If this is not an expected behavior, then [Actor[akka://ActorSystem/deadLetters]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
所以回复仅来自一个路由。如何获得所有路由的响应? (也许像 result = [$a, $b, $c]
这样的列表)
ask
(?
) 创建一个内部参与者来处理回复。这个内部 actor 只处理一个回复并自动关闭:这就是你只得到第一个回复,而其他两个路由的回复变成死信的原因。
要获得所需的行为,请使用 tell
(!
) 并收集路由的响应。例如:
class Master extends Actor {
val numRoutees = 3
val router = context.actorOf(RoundRobinPool(numRoutees).props(Props[Worker]), "router")
def handleMessages(replies: Set[String] = Set()): Receive = {
case brdcst =>
router ! Broadcast(brdcst)
case reply: String =>
val updatedReplies = replies + reply
if (updatedReplies.size == numRoutees) {
println("result = " + updatedReplies.mkString("[", ",", "]"))
}
become(handleMessages(updatedReplies))
}
def receive = handleMessages
}
在上面的示例中,master 使用 become
.
将 routees 的回复编码为其状态的一部分
另外,不要在 actors 中使用 Await
。
我正在学习在 Akka 路由器中使用广播消息。有没有办法从所有路由到路由器接收 ask
的响应?
我有这个示例代码。
Master.scala
object Master {
case object brdcst
}
class Master extends Actor {
implicit val timeout = Timeout(5 seconds)
val router: ActorRef = context.actorOf (RoundRobinPool (3).props(Props[Worker]), "router")
override def receive: Receive = {
case brdcst => {
val future = router ? Broadcast(brdcst)
val result = Await.result(future, timeout.duration)
println("result = " + result)
}
}
}
object MasterTest extends App {
val actorSystem = ActorSystem("ActorSystem")
val actor = actorSystem.actorOf(Props[master], "root")
actor ! brdcst
}
Worker.scala
class Worker extends Actor {
val routee = context.actorOf(Props[Worker], "routee")
override def receive: Receive = {
case brdcst => sender() ! self.path.name
}
}
此代码给出以下输出
result = $a
[INFO] [10/16/2018 21:47:07.484] [ActorSystem-akka.actor.default-dispatcher-2] [akka://ActorSystem/deadLetters] Message [java.lang.String] from Actor[akka://ActorSystem/user/root/router/$a#340358688] to Actor[akka://ActorSystem/deadLetters] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://ActorSystem/deadLetters]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [10/16/2018 21:47:07.504] [ActorSystem-akka.actor.default-dispatcher-10] [akka://ActorSystem/deadLetters] Message [java.lang.String] from Actor[akka://ActorSystem/user/root/router/$b#-151225340] to Actor[akka://ActorSystem/deadLetters] was not delivered. [2] dead letters encountered. If this is not an expected behavior, then [Actor[akka://ActorSystem/deadLetters]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
所以回复仅来自一个路由。如何获得所有路由的响应? (也许像 result = [$a, $b, $c]
这样的列表)
ask
(?
) 创建一个内部参与者来处理回复。这个内部 actor 只处理一个回复并自动关闭:这就是你只得到第一个回复,而其他两个路由的回复变成死信的原因。
要获得所需的行为,请使用 tell
(!
) 并收集路由的响应。例如:
class Master extends Actor {
val numRoutees = 3
val router = context.actorOf(RoundRobinPool(numRoutees).props(Props[Worker]), "router")
def handleMessages(replies: Set[String] = Set()): Receive = {
case brdcst =>
router ! Broadcast(brdcst)
case reply: String =>
val updatedReplies = replies + reply
if (updatedReplies.size == numRoutees) {
println("result = " + updatedReplies.mkString("[", ",", "]"))
}
become(handleMessages(updatedReplies))
}
def receive = handleMessages
}
在上面的示例中,master 使用 become
.
另外,不要在 actors 中使用 Await
。