Akka Actor FSM 的发布订阅不起作用
Publish-Subscribe for Akka Actor FSM not working
我有基本特征和子 class 这个特征。每个子 class 使用自己的事件 class 订阅事件流。例如 ActorFSM1 关心 InitEventImpl1 所以它会订阅这个事件。
但是,当我发布这些特定事件时,没有演员收到它们。
trait InitEvent
case class InitEventImpl1 extends InitEvent
case class InitEventImpl2 extends InitEvent
class ActorFSM[T <: InitEvent] extends FSM[s, Data] {}
class ActorFSM1 extends ActorFSM[InitEventImpl1] {
context.system.eventStream.subscribe(self, classOf[InitEventImpl1] )
}
class ActorFSM2 extends ActorFSM[InitEventImpl2] {
context.system.eventStream.subscribe(self, classOf[InitEventImpl2] )
}
当我尝试按如下方式发布它们时,没有人收到消息。我究竟做错了什么?
val system = ActorSystem("system")
val actor = system.actorOf(Props(new ActorFSM1()) )
system.eventStream.publish(InitEventImpl1())
无法保证在发布消息时演员已经初始化,也无法保证在 publish
之前调用了对 subscribe
的调用。这是一个经典的演员构造竞争条件。 ActorRef
的创建是同步的,但实际 Actor
的创建(当订阅发生时)是异步的,并且可能在创建 ActorRef
之后很长时间内发生。您可以通过向流发送一堆消息来测试这一点,看看 Actor
是否最终看到其中的一些。
trait InitEvent
case class InitEventImpl1(id: Int) extends InitEvent
class ActorFSM[T <: InitEvent] extends FSM[s, Data] {}
class ActorFSM1 extends ActorFSM[InitEventImpl1] {
context.system.eventStream.subscribe(self, classOf[InitEventImpl1] )
def receive: Receive = {
case InitEventImpl1(id) => println(id)
}
}
然后发布一堆消息,看看它是否最终得到其中的一些:
val system = ActorSystem("system")
val actor = system.actorOf(Props(new ActorFSM1()) )
Seq.tabulate(100)(InitEventImpl1(_)).foreach(system.eventStream.publish)
如果 Actor
绝对必须接收消息,我建议直接将消息发送到 ActorRef
或等待来自 Actor
的消息表明它已准备好接收消息。
另一方面,您在这里使用泛型似乎对您没有任何帮助。看来您想要做的是在 subscribe
调用和 receive
中使用通用参数。这可以通过一些反射魔法来完成:
import akka.actor.Actor
import scala.reflect._
class Test[E <: InitEvent : ClassTag] extends Actor {
context.system.eventStream.subscribe(self, classTag[E].runtimeClass)
def receive: Receive = {
case message: E => println(message)
}
}
然后在创建时指定消息类型ActorRef
val myTypeActor = system.actorOf(Props(new Test[InitEventImpl1]))
myTypeActor ! InitEventImpl1 // Will be processed
myTypeActor ! InitEventImpl2 // Will not be processed
我有基本特征和子 class 这个特征。每个子 class 使用自己的事件 class 订阅事件流。例如 ActorFSM1 关心 InitEventImpl1 所以它会订阅这个事件。 但是,当我发布这些特定事件时,没有演员收到它们。
trait InitEvent
case class InitEventImpl1 extends InitEvent
case class InitEventImpl2 extends InitEvent
class ActorFSM[T <: InitEvent] extends FSM[s, Data] {}
class ActorFSM1 extends ActorFSM[InitEventImpl1] {
context.system.eventStream.subscribe(self, classOf[InitEventImpl1] )
}
class ActorFSM2 extends ActorFSM[InitEventImpl2] {
context.system.eventStream.subscribe(self, classOf[InitEventImpl2] )
}
当我尝试按如下方式发布它们时,没有人收到消息。我究竟做错了什么?
val system = ActorSystem("system")
val actor = system.actorOf(Props(new ActorFSM1()) )
system.eventStream.publish(InitEventImpl1())
无法保证在发布消息时演员已经初始化,也无法保证在 publish
之前调用了对 subscribe
的调用。这是一个经典的演员构造竞争条件。 ActorRef
的创建是同步的,但实际 Actor
的创建(当订阅发生时)是异步的,并且可能在创建 ActorRef
之后很长时间内发生。您可以通过向流发送一堆消息来测试这一点,看看 Actor
是否最终看到其中的一些。
trait InitEvent
case class InitEventImpl1(id: Int) extends InitEvent
class ActorFSM[T <: InitEvent] extends FSM[s, Data] {}
class ActorFSM1 extends ActorFSM[InitEventImpl1] {
context.system.eventStream.subscribe(self, classOf[InitEventImpl1] )
def receive: Receive = {
case InitEventImpl1(id) => println(id)
}
}
然后发布一堆消息,看看它是否最终得到其中的一些:
val system = ActorSystem("system")
val actor = system.actorOf(Props(new ActorFSM1()) )
Seq.tabulate(100)(InitEventImpl1(_)).foreach(system.eventStream.publish)
如果 Actor
绝对必须接收消息,我建议直接将消息发送到 ActorRef
或等待来自 Actor
的消息表明它已准备好接收消息。
另一方面,您在这里使用泛型似乎对您没有任何帮助。看来您想要做的是在 subscribe
调用和 receive
中使用通用参数。这可以通过一些反射魔法来完成:
import akka.actor.Actor
import scala.reflect._
class Test[E <: InitEvent : ClassTag] extends Actor {
context.system.eventStream.subscribe(self, classTag[E].runtimeClass)
def receive: Receive = {
case message: E => println(message)
}
}
然后在创建时指定消息类型ActorRef
val myTypeActor = system.actorOf(Props(new Test[InitEventImpl1]))
myTypeActor ! InitEventImpl1 // Will be processed
myTypeActor ! InitEventImpl2 // Will not be processed