在 Akka 中推迟消息的正确方法
Correct way to postpone messages in Akka
我正在使用 akka 集群来分两个阶段执行分布式计算。首先 phaseA
然后 phaseB
。为了处理阶段,我使用 akka 的 FSM。
没有硬同步,因此其中一个节点可能会达到 phaseB
,而其他节点仍处于 phaseA
。
问题是,phaseB
中的一个人向其他人发送 phaseB-related
消息(他们还在 phaseA
中)是什么原因导致他们丢失 phaseB-related
消息。
现在我使用简单的技巧来推迟未知消息:
case any => self ! any
但是在我看来,这不是正确的做法。我知道我也可以使用 akka scheduler 安排 any
,但我也不喜欢这样。
这里是简化的代码:
package whatever
import akka.actor._
object Test extends App {
case object PhaseA
case object PhaseB
class Any extends Actor {
def phaseA: Receive = {
case PhaseA => {
context.become(phaseB)
println("in phaseB now")
}
case any => self ! any
}
def phaseB: Receive = {
case PhaseB => println("got phaseB message !")
}
def receive = phaseA
}
val system = ActorSystem("MySystem")
val any = system.actorOf(Props(new Any), name = "any")
any ! PhaseB
any ! PhaseA
}
在这种情况下,推迟消息的正确方法是什么?
您可以隐藏消息以供以后处理。将 akka.actor.Stash
混入您的演员和 stash()
您的 phaseB
消息中以备后用。
当您的 FSM 处于 phaseA
并收到一条 phaseB
消息时,调用 stash()
。当该 actor 然后转换到 phaseB
状态时,调用 unstashAll()
并且所有隐藏的消息将被重新传递。
我正在使用 akka 集群来分两个阶段执行分布式计算。首先 phaseA
然后 phaseB
。为了处理阶段,我使用 akka 的 FSM。
没有硬同步,因此其中一个节点可能会达到 phaseB
,而其他节点仍处于 phaseA
。
问题是,phaseB
中的一个人向其他人发送 phaseB-related
消息(他们还在 phaseA
中)是什么原因导致他们丢失 phaseB-related
消息。
现在我使用简单的技巧来推迟未知消息:
case any => self ! any
但是在我看来,这不是正确的做法。我知道我也可以使用 akka scheduler 安排 any
,但我也不喜欢这样。
这里是简化的代码:
package whatever
import akka.actor._
object Test extends App {
case object PhaseA
case object PhaseB
class Any extends Actor {
def phaseA: Receive = {
case PhaseA => {
context.become(phaseB)
println("in phaseB now")
}
case any => self ! any
}
def phaseB: Receive = {
case PhaseB => println("got phaseB message !")
}
def receive = phaseA
}
val system = ActorSystem("MySystem")
val any = system.actorOf(Props(new Any), name = "any")
any ! PhaseB
any ! PhaseA
}
在这种情况下,推迟消息的正确方法是什么?
您可以隐藏消息以供以后处理。将 akka.actor.Stash
混入您的演员和 stash()
您的 phaseB
消息中以备后用。
当您的 FSM 处于 phaseA
并收到一条 phaseB
消息时,调用 stash()
。当该 actor 然后转换到 phaseB
状态时,调用 unstashAll()
并且所有隐藏的消息将被重新传递。