Akka Actor FSM 的发布订阅不起作用

Publish-Subscribe for Akka Actor FSM not working

我有基本特征和子 class 这个特征。每个子 class 使用自己的事件 class 订阅事件流。例如 A​​ctorFSM1 关心 InitEventImpl1 所以它会订阅这个事件。 但是,当我发布这些特定事件时,没有演员收到它们。

trait InitEvent
case class InitEventImpl1 extends InitEvent
case class InitEventImpl2 extends InitEvent

class ActorFSM[T <: InitEvent] extends FSM[s, Data] {}
class ActorFSM1 extends ActorFSM[InitEventImpl1] {
  context.system.eventStream.subscribe(self, classOf[InitEventImpl1] )
}
class ActorFSM2 extends ActorFSM[InitEventImpl2] {
  context.system.eventStream.subscribe(self, classOf[InitEventImpl2] )
}

当我尝试按如下方式发布它们时,没有人收到消息。我究竟做错了什么?

 val system = ActorSystem("system")
 val actor = system.actorOf(Props(new ActorFSM1()) )
 system.eventStream.publish(InitEventImpl1())

无法保证在发布消息时演员已经初始化,也无法保证在 publish 之前调用了对 subscribe 的调用。这是一个经典的演员构造竞争条件。 ActorRef 的创建是同步的,但实际 Actor 的创建(当订阅发生时)是异步的,并且可能在创建 ActorRef 之后很长时间内发生。您可以通过向流发送一堆消息来测试这一点,看看 Actor 是否最终看到其中的一些。

trait InitEvent
case class InitEventImpl1(id: Int) extends InitEvent

class ActorFSM[T <: InitEvent] extends FSM[s, Data] {}
class ActorFSM1 extends ActorFSM[InitEventImpl1] {
  context.system.eventStream.subscribe(self, classOf[InitEventImpl1] )
  def receive: Receive = {
    case InitEventImpl1(id) => println(id)
  }
}

然后发布一堆消息,看看它是否最终得到其中的一些:

val system = ActorSystem("system")
val actor = system.actorOf(Props(new ActorFSM1()) )
Seq.tabulate(100)(InitEventImpl1(_)).foreach(system.eventStream.publish)

如果 Actor 绝对必须接收消息,我建议直接将消息发送到 ActorRef 或等待来自 Actor 的消息表明它已准备好接收消息。

另一方面,您在这里使用泛型似乎对您没有任何帮助。看来您想要做的是在 subscribe 调用和 receive 中使用通用参数。这可以通过一些反射魔法来完成:

import akka.actor.Actor
import scala.reflect._

class Test[E <: InitEvent : ClassTag] extends Actor {
   context.system.eventStream.subscribe(self, classTag[E].runtimeClass)
   def receive: Receive = {
     case message: E => println(message)
   }
}

然后在创建时指定消息类型ActorRef

val myTypeActor = system.actorOf(Props(new Test[InitEventImpl1]))
myTypeActor ! InitEventImpl1 // Will be processed
myTypeActor ! InitEventImpl2 // Will not be processed