从路由器中删除路由并获得毒丸?
Remove a route from a routee and get poison pill?
我有一个 actor 将消息路由到一组其他 actor,这些 actor 在不稳定的服务之上充当包装器。到目前为止,一切都很好,但我希望能够控制有多少参与者存在于该服务上(因为它们可能代表套接字连接或其他物理属性)所以能够管理它们的缩放会很好。
我看到路由器上有一个删除路由的方法,它确实删除了路由,但是有没有办法在删除之前先向我的 child 演员发送毒丸?文档说以这种方式删除路由时应该会发出毒丸消息,但我没有看到这种情况发生。
我有这样的代码
final Collection<Routee> routees = JavaConversions.asJavaCollection(router.routees());
for (final Routee routee : routees.stream()
.limit(numberToRemove)
.collect(toList())) {
router = router.removeRoutee(routee);
}
看来我忽略了一个事实,即我必须手动发送毒丸才能停止我的路由器。这是一个完整的 Scala 演示应用程序
import akka.actor._
import akka.routing._
case class Add()
case class Remove()
class Worker(id: Integer) extends UntypedActor {
println(s"Made worker $id")
@throws[Exception](classOf[Exception]) override
def preStart(): Unit = {
println(s"Starting $id")
}
@throws[Exception](classOf[Exception]) override
def postStop(): Unit = {
println(s"Stopping $id")
}
@throws[Exception](classOf[Exception])
override def onReceive(message: Any): Unit = message match {
case _ => println(s"Message received on actor $id")
}
}
class Master extends Actor {
var count = 0
def makeWorker() = {
val id = count
count = count + 1
context.actorOf(Props(new Worker(id)))
}
var activeWorkers = Seq.fill(2) {
makeWorker()
}
var router = Router(RoundRobinRoutingLogic(), activeWorkers.map(r => {
context watch r
ActorRefRoutee(r)
}).toIndexedSeq)
def receive = {
case Remove =>
println("Removing route")
val head = router.routees.head.asInstanceOf[ActorRefRoutee].ref
head ! PoisonPill
context unwatch head
router = router.removeRoutee(head)
printRoutes()
case Add =>
println("Adding route")
val worker = makeWorker()
context watch worker
router = router.addRoutee(worker)
printRoutes()
case w: AnyRef =>
printRoutes()
router.route(w, sender())
}
def printRoutes(): Unit ={
val size = router.routees.size
println(s"Total routes $size")
}
}
object Main extends App {
var system = ActorSystem.create("foo")
var master = system.actorOf(Props[Master])
master ! "foo"
master ! Remove
master ! "foo"
master ! "bar"
master ! Add
master ! "biz"
}
我有一个 actor 将消息路由到一组其他 actor,这些 actor 在不稳定的服务之上充当包装器。到目前为止,一切都很好,但我希望能够控制有多少参与者存在于该服务上(因为它们可能代表套接字连接或其他物理属性)所以能够管理它们的缩放会很好。
我看到路由器上有一个删除路由的方法,它确实删除了路由,但是有没有办法在删除之前先向我的 child 演员发送毒丸?文档说以这种方式删除路由时应该会发出毒丸消息,但我没有看到这种情况发生。
我有这样的代码
final Collection<Routee> routees = JavaConversions.asJavaCollection(router.routees());
for (final Routee routee : routees.stream()
.limit(numberToRemove)
.collect(toList())) {
router = router.removeRoutee(routee);
}
看来我忽略了一个事实,即我必须手动发送毒丸才能停止我的路由器。这是一个完整的 Scala 演示应用程序
import akka.actor._
import akka.routing._
case class Add()
case class Remove()
class Worker(id: Integer) extends UntypedActor {
println(s"Made worker $id")
@throws[Exception](classOf[Exception]) override
def preStart(): Unit = {
println(s"Starting $id")
}
@throws[Exception](classOf[Exception]) override
def postStop(): Unit = {
println(s"Stopping $id")
}
@throws[Exception](classOf[Exception])
override def onReceive(message: Any): Unit = message match {
case _ => println(s"Message received on actor $id")
}
}
class Master extends Actor {
var count = 0
def makeWorker() = {
val id = count
count = count + 1
context.actorOf(Props(new Worker(id)))
}
var activeWorkers = Seq.fill(2) {
makeWorker()
}
var router = Router(RoundRobinRoutingLogic(), activeWorkers.map(r => {
context watch r
ActorRefRoutee(r)
}).toIndexedSeq)
def receive = {
case Remove =>
println("Removing route")
val head = router.routees.head.asInstanceOf[ActorRefRoutee].ref
head ! PoisonPill
context unwatch head
router = router.removeRoutee(head)
printRoutes()
case Add =>
println("Adding route")
val worker = makeWorker()
context watch worker
router = router.addRoutee(worker)
printRoutes()
case w: AnyRef =>
printRoutes()
router.route(w, sender())
}
def printRoutes(): Unit ={
val size = router.routees.size
println(s"Total routes $size")
}
}
object Main extends App {
var system = ActorSystem.create("foo")
var master = system.actorOf(Props[Master])
master ! "foo"
master ! Remove
master ! "foo"
master ! "bar"
master ! Add
master ! "biz"
}