从路由器中删除路由并获得毒丸?

Remove a route from a routee and get poison pill?

我有一个 actor 将消息路由到一组其他 actor,这些 actor 在不稳定的服务之上充当包装器。到目前为止,一切都很好,但我希望能够控制有多少参与者存在于该服务上(因为它们可能代表套接字连接或其他物理属性)所以能够管理它们的缩放会很好。

我看到路由器上有一个删除路由的方法,它确实删除了路由,但是有没有办法在删除之前先向我的 child 演员发送毒丸?文档说以这种方式删除路由时应该会发出毒丸消息,但我没有看到这种情况发生。

我有这样的代码

final Collection<Routee> routees = JavaConversions.asJavaCollection(router.routees());

for (final Routee routee : routees.stream()
                                  .limit(numberToRemove)
                                  .collect(toList())) {

    router = router.removeRoutee(routee);
}

看来我忽略了一个事实,即我必须手动发送毒丸才能停止我的路由器。这是一个完整的 Scala 演示应用程序

import akka.actor._
import akka.routing._

case class Add()

case class Remove()

class Worker(id: Integer) extends UntypedActor {
  println(s"Made worker $id")

  @throws[Exception](classOf[Exception]) override
  def preStart(): Unit = {
    println(s"Starting $id")
  }

  @throws[Exception](classOf[Exception]) override
  def postStop(): Unit = {
    println(s"Stopping $id")
  }

  @throws[Exception](classOf[Exception])
  override def onReceive(message: Any): Unit = message match {
    case _ => println(s"Message received on actor $id")
  }
}

class Master extends Actor {

  var count = 0

  def makeWorker() = {
    val id = count

    count = count + 1

    context.actorOf(Props(new Worker(id)))
  }

  var activeWorkers = Seq.fill(2) {
    makeWorker()
  }

  var router = Router(RoundRobinRoutingLogic(), activeWorkers.map(r => {
    context watch r
    ActorRefRoutee(r)
  }).toIndexedSeq)

  def receive = {
    case Remove =>
      println("Removing route")

      val head = router.routees.head.asInstanceOf[ActorRefRoutee].ref

      head ! PoisonPill

      context unwatch head

      router = router.removeRoutee(head)

      printRoutes()


    case Add =>
      println("Adding route")

      val worker = makeWorker()

      context watch worker

      router = router.addRoutee(worker)

      printRoutes()


    case w: AnyRef =>

      printRoutes()

      router.route(w, sender())
  }

  def printRoutes(): Unit ={
    val size = router.routees.size

    println(s"Total routes $size")
  }
}

object Main extends App {
  var system = ActorSystem.create("foo")

  var master = system.actorOf(Props[Master])

  master ! "foo"

  master ! Remove

  master ! "foo"

  master ! "bar"

  master ! Add

  master ! "biz"
}