如何观看多个 akka 演员终止
how to watch multiple akka actors for termination
我有 akka 系统,它基本上是两个生产者 actor 向一个消费者 actor 发送消息。在简化形式中,我有这样的东西:
class ProducerA extends Actor {
def receive = {
case Produce => Consumer ! generateMessageA()
}
... more code ...
}
class ProducerB extends Actor {
def receive = {
case Produce => Consumer ! generateMessageB()
}
... more code ...
}
class Consumer extends Actor {
def receive = {
case A => handleMessageA(A)
case B => handleMessageB(B)
}
... more code ...
}
而且他们都是同一个akka系统的兄弟。
我正在想办法优雅地终止这个系统。这意味着在关闭时我希望 ProducerA
和 ProducerB
立即停止然后我希望 Consumer
完成处理消息队列中剩余的任何消息然后关闭。
看来我想要的是 Consumer
演员能够看到 ProducerA
和 ProducerB
的终止。或者一般来说,我想要的似乎是能够在两个生产者都停止后向 Consumer
发送一条 PoisonPill
消息。
https://alvinalexander.com/scala/how-to-monitor-akka-actor-death-with-watch-method
上面的教程很好地解释了一个 actor 如何监视另一个 actor 的终止,但不确定一个 actor 如何监视多个 actor 的终止。
import akka.actor._
import akka.util.Timeout
import scala.concurrent.duration.DurationInt
class Producer extends Actor {
def receive = {
case _ => println("Producer received a message")
}
}
case object KillConsumer
class Consumer extends Actor {
def receive = {
case KillConsumer =>
println("Stopping Consumer After All Producers")
context.stop(self)
case _ => println("Parent received a message")
}
override def postStop(): Unit = {
println("Post Stop Consumer")
super.postStop()
}
}
class ProducerWatchers(producerListRef: List[ActorRef], consumerRef: ActorRef) extends Actor {
producerListRef.foreach(x => context.watch(x))
context.watch(consumerRef)
var producerActorCount = producerListRef.length
implicit val timeout: Timeout = Timeout(5 seconds)
override def receive: Receive = {
case Terminated(x) if producerActorCount == 1 && producerListRef.contains(x) =>
consumerRef ! KillConsumer
case Terminated(x) if producerListRef.contains(x) =>
producerActorCount -= 1
case Terminated(`consumerRef`) =>
println("Killing ProducerWatchers On Consumer End")
context.stop(self)
case _ => println("Dropping Message")
}
override def postStop(): Unit = {
println("Post Stop ProducerWatchers")
super.postStop()
}
}
object ProducerWatchers {
def apply(producerListRef: List[ActorRef], consumerRef: ActorRef) : Props = Props(new ProducerWatchers(producerListRef, consumerRef))
}
object AkkaActorKill {
def main(args: Array[String]): Unit = {
val actorSystem = ActorSystem("AkkaActorKill")
implicit val timeout: Timeout = Timeout(10 seconds)
val consumerRef = actorSystem.actorOf(Props[Consumer], "Consumer")
val producer1 = actorSystem.actorOf(Props[Producer], name = "Producer1")
val producer2 = actorSystem.actorOf(Props[Producer], name = "Producer2")
val producer3 = actorSystem.actorOf(Props[Producer], name = "Producer3")
val producerWatchers = actorSystem.actorOf(ProducerWatchers(List[ActorRef](producer1, producer2, producer3), consumerRef),"ProducerWatchers")
producer1 ! PoisonPill
producer2 ! PoisonPill
producer3 ! PoisonPill
Thread.sleep(5000)
actorSystem.terminate
}
}
可以使用ProducerWatchersactor来实现,它管理被杀死的生产者,一旦所有的生产者都被杀死,你可以杀死消费者actor,然后是ProducerWatchers actor。
一个演员可以简单地通过多次调用 context.watch
来观看多个演员,每次调用都传递不同的 ActorRef
。例如,您的 Consumer
演员可以通过以下方式观看 Producer
演员的终止:
case class WatchMe(ref: ActorRef)
class Consumer extends Actor {
var watched = Set[ActorRef]()
def receive = {
case WatchMe(ref) =>
context.watch(ref)
watched = watched + ref
case Terminated(ref) =>
watched = watched - ref
if (watched.isEmpty) self ! PoisonPill
// case ...
}
}
两个 Producer
参与者都会将各自的引用发送给 Consumer
,后者随后会监控 Producer
参与者的终止。当 Producer
actor 都终止时,Consumer
会向自己发送一个 PoisonPill
。因为 PoisonPill
is treated like a normal message in an actor's mailbox,Consumer
将在处理 PoisonPill
并自行关闭之前处理任何已经入队的消息。
Akka 文档中提到的 Derek Wyatt's "Shutdown Patterns in Akka 2" blog post 中描述了类似的模式。
所以我最终采用的解决方案受到 Derek Wyatt's terminator pattern
的启发
val shutdownFut = Future.sequence(
Seq(
gracefulStop(producerA, ProducerShutdownWaitSeconds seconds),
gracefulStop(producerB, ProducerShutdownWaitSeconds seconds),
)
).flatMap(_ => gracefulStop(consumer, ConsumerShutdownWaitSeconds seconds))
Await.result(shutdownFut, (ProducerShutdownWaitSeconds seconds) + (ConsumerShutdownWaitSeconds seconds))
这或多或少正是我想要的。消费者关闭等待生产者根据期货的履行关闭。此外,整个关闭本身会导致您可以等待的未来,因此能够使线程保持足够长的时间以正确清理所有内容。
我有 akka 系统,它基本上是两个生产者 actor 向一个消费者 actor 发送消息。在简化形式中,我有这样的东西:
class ProducerA extends Actor {
def receive = {
case Produce => Consumer ! generateMessageA()
}
... more code ...
}
class ProducerB extends Actor {
def receive = {
case Produce => Consumer ! generateMessageB()
}
... more code ...
}
class Consumer extends Actor {
def receive = {
case A => handleMessageA(A)
case B => handleMessageB(B)
}
... more code ...
}
而且他们都是同一个akka系统的兄弟。
我正在想办法优雅地终止这个系统。这意味着在关闭时我希望 ProducerA
和 ProducerB
立即停止然后我希望 Consumer
完成处理消息队列中剩余的任何消息然后关闭。
看来我想要的是 Consumer
演员能够看到 ProducerA
和 ProducerB
的终止。或者一般来说,我想要的似乎是能够在两个生产者都停止后向 Consumer
发送一条 PoisonPill
消息。
https://alvinalexander.com/scala/how-to-monitor-akka-actor-death-with-watch-method
上面的教程很好地解释了一个 actor 如何监视另一个 actor 的终止,但不确定一个 actor 如何监视多个 actor 的终止。
import akka.actor._
import akka.util.Timeout
import scala.concurrent.duration.DurationInt
class Producer extends Actor {
def receive = {
case _ => println("Producer received a message")
}
}
case object KillConsumer
class Consumer extends Actor {
def receive = {
case KillConsumer =>
println("Stopping Consumer After All Producers")
context.stop(self)
case _ => println("Parent received a message")
}
override def postStop(): Unit = {
println("Post Stop Consumer")
super.postStop()
}
}
class ProducerWatchers(producerListRef: List[ActorRef], consumerRef: ActorRef) extends Actor {
producerListRef.foreach(x => context.watch(x))
context.watch(consumerRef)
var producerActorCount = producerListRef.length
implicit val timeout: Timeout = Timeout(5 seconds)
override def receive: Receive = {
case Terminated(x) if producerActorCount == 1 && producerListRef.contains(x) =>
consumerRef ! KillConsumer
case Terminated(x) if producerListRef.contains(x) =>
producerActorCount -= 1
case Terminated(`consumerRef`) =>
println("Killing ProducerWatchers On Consumer End")
context.stop(self)
case _ => println("Dropping Message")
}
override def postStop(): Unit = {
println("Post Stop ProducerWatchers")
super.postStop()
}
}
object ProducerWatchers {
def apply(producerListRef: List[ActorRef], consumerRef: ActorRef) : Props = Props(new ProducerWatchers(producerListRef, consumerRef))
}
object AkkaActorKill {
def main(args: Array[String]): Unit = {
val actorSystem = ActorSystem("AkkaActorKill")
implicit val timeout: Timeout = Timeout(10 seconds)
val consumerRef = actorSystem.actorOf(Props[Consumer], "Consumer")
val producer1 = actorSystem.actorOf(Props[Producer], name = "Producer1")
val producer2 = actorSystem.actorOf(Props[Producer], name = "Producer2")
val producer3 = actorSystem.actorOf(Props[Producer], name = "Producer3")
val producerWatchers = actorSystem.actorOf(ProducerWatchers(List[ActorRef](producer1, producer2, producer3), consumerRef),"ProducerWatchers")
producer1 ! PoisonPill
producer2 ! PoisonPill
producer3 ! PoisonPill
Thread.sleep(5000)
actorSystem.terminate
}
}
可以使用ProducerWatchersactor来实现,它管理被杀死的生产者,一旦所有的生产者都被杀死,你可以杀死消费者actor,然后是ProducerWatchers actor。
一个演员可以简单地通过多次调用 context.watch
来观看多个演员,每次调用都传递不同的 ActorRef
。例如,您的 Consumer
演员可以通过以下方式观看 Producer
演员的终止:
case class WatchMe(ref: ActorRef)
class Consumer extends Actor {
var watched = Set[ActorRef]()
def receive = {
case WatchMe(ref) =>
context.watch(ref)
watched = watched + ref
case Terminated(ref) =>
watched = watched - ref
if (watched.isEmpty) self ! PoisonPill
// case ...
}
}
两个 Producer
参与者都会将各自的引用发送给 Consumer
,后者随后会监控 Producer
参与者的终止。当 Producer
actor 都终止时,Consumer
会向自己发送一个 PoisonPill
。因为 PoisonPill
is treated like a normal message in an actor's mailbox,Consumer
将在处理 PoisonPill
并自行关闭之前处理任何已经入队的消息。
Akka 文档中提到的 Derek Wyatt's "Shutdown Patterns in Akka 2" blog post 中描述了类似的模式。
所以我最终采用的解决方案受到 Derek Wyatt's terminator pattern
的启发val shutdownFut = Future.sequence(
Seq(
gracefulStop(producerA, ProducerShutdownWaitSeconds seconds),
gracefulStop(producerB, ProducerShutdownWaitSeconds seconds),
)
).flatMap(_ => gracefulStop(consumer, ConsumerShutdownWaitSeconds seconds))
Await.result(shutdownFut, (ProducerShutdownWaitSeconds seconds) + (ConsumerShutdownWaitSeconds seconds))
这或多或少正是我想要的。消费者关闭等待生产者根据期货的履行关闭。此外,整个关闭本身会导致您可以等待的未来,因此能够使线程保持足够长的时间以正确清理所有内容。