如何观看多个 akka 演员终止

how to watch multiple akka actors for termination

我有 akka 系统,它基本上是两个生产者 actor 向一个消费者 actor 发送消息。在简化形式中,我有这样的东西:

class ProducerA extends Actor {
    def receive = {
        case Produce => Consumer ! generateMessageA()
    }

    ... more code ...
}

class ProducerB extends Actor {
    def receive = {
        case Produce => Consumer ! generateMessageB()
    }

    ... more code ...
}

class Consumer extends Actor {
    def receive = {
        case A => handleMessageA(A)
        case B => handleMessageB(B)
    }

    ... more code ...
}

而且他们都是同一个akka系统的兄弟。

我正在想办法优雅地终止这个系统。这意味着在关闭时我希望 ProducerAProducerB 立即停止然后我希望 Consumer 完成处理消息队列中剩余的任何消息然后关闭。

看来我想要的是 Consumer 演员能够看到 ProducerAProducerB 的终止。或者一般来说,我想要的似乎是能够在两个生产者都停止后向 Consumer 发送一条 PoisonPill 消息。

https://alvinalexander.com/scala/how-to-monitor-akka-actor-death-with-watch-method

上面的教程很好地解释了一个 actor 如何监视另一个 actor 的终止,但不确定一个 actor 如何监视多个 actor 的终止。

import akka.actor._
import akka.util.Timeout
import scala.concurrent.duration.DurationInt

class Producer extends Actor {
  def receive = {
    case _ => println("Producer received a message")
  }
}

case object KillConsumer

class Consumer extends Actor {

  def receive = {
    case KillConsumer =>
      println("Stopping Consumer After All Producers")
      context.stop(self)
    case _ => println("Parent received a message")
  }

  override def postStop(): Unit = {
    println("Post Stop Consumer")
    super.postStop()
  }
}

class ProducerWatchers(producerListRef: List[ActorRef], consumerRef: ActorRef) extends Actor {
  producerListRef.foreach(x => context.watch(x))
  context.watch(consumerRef)
  var producerActorCount = producerListRef.length
  implicit val timeout: Timeout = Timeout(5 seconds)
  override def receive: Receive = {
    case Terminated(x) if producerActorCount == 1 && producerListRef.contains(x) =>
      consumerRef ! KillConsumer

    case Terminated(x) if producerListRef.contains(x) =>
      producerActorCount -= 1

    case Terminated(`consumerRef`) =>
      println("Killing ProducerWatchers On Consumer End")
      context.stop(self)

    case _ => println("Dropping Message")
  }

  override def postStop(): Unit = {
    println("Post Stop ProducerWatchers")
    super.postStop()
  }
}

object ProducerWatchers {
  def apply(producerListRef: List[ActorRef], consumerRef: ActorRef) : Props = Props(new ProducerWatchers(producerListRef, consumerRef))
}

object AkkaActorKill {
  def main(args: Array[String]): Unit = {
    val actorSystem = ActorSystem("AkkaActorKill")
    implicit val timeout: Timeout = Timeout(10 seconds)

    val consumerRef = actorSystem.actorOf(Props[Consumer], "Consumer")
    val producer1 = actorSystem.actorOf(Props[Producer], name = "Producer1")
    val producer2 = actorSystem.actorOf(Props[Producer], name = "Producer2")
    val producer3 = actorSystem.actorOf(Props[Producer], name = "Producer3")

    val producerWatchers = actorSystem.actorOf(ProducerWatchers(List[ActorRef](producer1, producer2, producer3), consumerRef),"ProducerWatchers")

    producer1 ! PoisonPill
    producer2 ! PoisonPill
    producer3 ! PoisonPill

    Thread.sleep(5000)
    actorSystem.terminate
  }
}

可以使用ProducerWatchersactor来实现,它管理被杀死的生产者,一旦所有的生产者都被杀死,你可以杀死消费者actor,然后是ProducerWatchers actor。

一个演员可以简单地通过多次调用 context.watch 来观看多个演员,每次调用都传递不同的 ActorRef。例如,您的 Consumer 演员可以通过以下方式观看 Producer 演员的终止:

case class WatchMe(ref: ActorRef)

class Consumer extends Actor {
  var watched = Set[ActorRef]()

  def receive = {
    case WatchMe(ref) =>
      context.watch(ref)
      watched = watched + ref
    case Terminated(ref) =>
      watched = watched - ref
      if (watched.isEmpty) self ! PoisonPill
    // case ...
  }
}

两个 Producer 参与者都会将各自的引用发送给 Consumer,后者随后会监控 Producer 参与者的终止。当 Producer actor 都终止时,Consumer 会向自己发送一个 PoisonPill。因为 PoisonPill is treated like a normal message in an actor's mailboxConsumer 将在处理 PoisonPill 并自行关闭之前处理任何已经入队的消息。

Akka 文档中提到的 Derek Wyatt's "Shutdown Patterns in Akka 2" blog post 中描述了类似的模式。

所以我最终采用的解决方案受到 Derek Wyatt's terminator pattern

的启发
val shutdownFut = Future.sequence(
  Seq(
    gracefulStop(producerA, ProducerShutdownWaitSeconds seconds),
    gracefulStop(producerB, ProducerShutdownWaitSeconds seconds),
  )
).flatMap(_ => gracefulStop(consumer, ConsumerShutdownWaitSeconds seconds))

Await.result(shutdownFut, (ProducerShutdownWaitSeconds seconds) + (ConsumerShutdownWaitSeconds seconds))

这或多或少正是我想要的。消费者关闭等待生产者根据期货的履行关闭。此外,整个关闭本身会导致您可以等待的未来,因此能够使线程保持足够长的时间以正确清理所有内容。