在 Akka 中推迟消息的正确方法

Correct way to postpone messages in Akka

我正在使用 akka 集群来分两个阶段执行分布式计算。首先 phaseA 然后 phaseB。为了处理阶段,我使用 akka 的 FSM。

没有硬同步,因此其中一个节点可能会达到 phaseB,而其他节点仍处于 phaseA

问题是,phaseB 中的一个人向其他人发送 phaseB-related 消息(他们还在 phaseA 中)是什么原因导致他们丢失 phaseB-related 消息。

现在我使用简单的技巧来推迟未知消息:

case any => self ! any

但是在我看来,这不是正确的做法。我知道我也可以使用 akka scheduler 安排 any,但我也不喜欢这样。

这里是简化的代码:

package whatever

import akka.actor._

object Test extends App {

  case object PhaseA
  case object PhaseB

  class Any extends Actor {

    def phaseA: Receive = {
      case PhaseA => {
        context.become(phaseB)
        println("in phaseB now")
      }
      case any => self ! any
    }

    def phaseB: Receive = {
      case PhaseB => println("got phaseB message !")
    }

    def receive = phaseA

  }

  val system = ActorSystem("MySystem")
  val any = system.actorOf(Props(new Any), name = "any")
  any ! PhaseB
  any ! PhaseA
}

在这种情况下,推迟消息的正确方法是什么?

您可以隐藏消息以供以后处理。将 akka.actor.Stash 混入您的演员和 stash() 您的 phaseB 消息中以备后用。

当您的 FSM 处于 phaseA 并收到一条 phaseB 消息时,调用 stash()。当该 actor 然后转换到 phaseB 状态时,调用 unstashAll() 并且所有隐藏的消息将被重新传递。