Actor 可以在特定条件下读取消息吗?

Can actors read messages under a certain condition?

我遇到这种情况:

现在我想知道我是否可以做以下事情:

不知道我解释的好不好

编辑:谢谢,答案很好,但我仍有一些疑问。

检查 Become/Unbecome 功能。它可以让你改变演员的行为。

如果我理解正确的话,您希望您的 ActorB 具有两种不同的状态。在第一个状态,它应该缓存它接收到的消息。在第二种状态下,它应该打印所有缓存的消息并开始打印所有新消息。

像这样:

case class Start
case class Stop
case class Message(val: String)

class ActorB extends Actor {

  var cache = Queue()

  def initial: Receive = {
    case Message(msg) => cache += msg
    case Start => 
      for (m <- cache) self ! Message(m)
      context.become(printing)
  }

  def printing: Receive = {
    case Message(msg) => println(msg)
    case Stop => context.become(initial) //or stop the actor
  }

  def receive = initial
}

让 Actor B 在两种状态(两种不同的行为)之间交替。在初始 'pending' 状态下,它等待 'start' 消息,而 stashing 任何其他消息。

收到 'start' 消息后,取消存储所有存储的消息和 become 'active',等待 'stop' 消息并写出收到的其他消息(这将包括未隐藏的)。收到 'stop' 消息后,恢复到 'pending' 行为。

我的一些想法

  1. 是的,如果布尔标志是从数据库或配置文件等系统资源中获取的,但我认为它不应该依赖于任何外部消息,因为参与者从多个其他演员。如果ActorB只被ActorA使用,两者可以合并为一个

  2. 同1,如何处理来自多个actor的消息?如果只有一个actorA,可以合并两个actor。如果有多个,可以在数据库中设置flag,actorA将db中的flag改为"Start"或"Stop"。 Actor B 将根据标志打印或不打印。

一个演员应该在其他演员的状态下非常独立地做某事。开始和停止实际上是ActorA的某种状态而不是ActorB

您完全可以使用 Akka 的 Stash 特性和 become/unbecome 功能来解决您的问题。思路如下:

当您收到 Stop 消息时,您会切换到一种行为,即存储所有不是 Start 的消息。当您收到一条 Start 消息时,您会切换到打印所有收到的消息的行为,此外您还取消存储同时到达的所有消息。

case object Start
case object Stop
case object TriggerStateChange
case object SendMessage

class ActorB extends Actor with Stash {
  override def receive: Receive = {
    case Start =>
      context.become(printingBehavior, false)
      unstashAll()
    case x => stash()
  }

  def printingBehavior: Receive = {
    case msg: String => println(msg)
    case Stop => context.unbecome()
  }
}

class ActorA(val actorB: ActorRef) extends Actor {

  var counter = 0
  var started = false

  override def preStart: Unit = {
    import context.dispatcher

    this.context.system.scheduler.schedule(0 seconds, 5 seconds, self, TriggerStateChange)
    this.context.system.scheduler.schedule(0 seconds, 1 seconds, self, SendMessage)
  }

  override def receive: Actor.Receive = {
    case SendMessage =>
      actorB ! "Message: " + counter
      counter += 1
    case TriggerStateChange =>
      actorB ! (if (started) {
        started = false
        Stop
      } else {
        started = true
        Start
      })
  }
}

object Akka {
  def main(args: Array[String]) = {
    val system = ActorSystem.create("TestActorSystem")

    val actorB = system.actorOf(Props(classOf[ActorB]), "ActorB")
    val actorA = system.actorOf(Props(classOf[ActorA], actorB), "ActorA")

    system.awaitTermination()
  }
}

你已经有了很多好的答案,但不知何故我觉得有必要提供更简短的答案,因为你需要的不一定是状态机:

class ActorB extends Actor {
  def receive: Receive = caching(Nil)

  def caching(cached: List[String]): Receive = {
    case msg: String => 
      context.become(caching(msg :: cached))
    case Start => 
      cached.reverse.foreach(println)
      context.become(caching(Nil))
  }
}