替换 BalancingPool 中的 worker

Replacing workers in BalancingPool

我正在使用 akka BalancingPool 将任务分配给工作人员。在我 add/remove 池中的工作人员之前,它工作得很好。我想做,因为有些工人不可靠且表现不佳。但是,平衡池在替换后仅将所有消息发送给一个工作人员。

这是一个针对此

的 Scala 测试
import scala.concurrent.duration._
import org.scalatest._
import akka.util.Timeout
import akka.actor._
import akka.routing._ 
import akka.testkit._

class BalancingPoolSpec extends TestKit(ActorSystem("BalancingPoolSpec")) with ImplicitSender
  with WordSpecLike with Matchers with BeforeAndAfterAll {

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  val numberOfTestMessages = 5
  val numberOfWorkers = 3
  val pool = system.actorOf(BalancingPool(numberOfWorkers).props(Props[Worker]), "pool")

  def sendMessagesAndCollectStatistic = {
    for (i <- 1 to numberOfTestMessages) pool ! "task"
    (currentRoutes, collectResponces)
  }

  def collectResponces = receiveN(numberOfTestMessages, 10.second).groupBy(l => l).map(t => (t._1, t._2.length))

  def currentRoutes = {
    pool ! GetRoutees
    val Routees(routees) = expectMsgAnyClassOf(classOf[Routees])
    routees
  }

  def replaceWorkers(oldRoutees: Seq[Routee]) = {
    //Adding new Routees before removing old ones to make it work :)
    for (i <- 1 to numberOfWorkers) pool ! AddRoutee(ActorRefRoutee(system.actorOf(Props[Worker])))
    for (r <- oldRoutees) pool ! RemoveRoutee(r)
    Thread.sleep(500) //Give some time to BalancingPool
  }

  "test" in {
    val (routees1, responces1) = sendMessagesAndCollectStatistic
    replaceWorkers(routees1)
    val (routees2, responces2) = sendMessagesAndCollectStatistic

    assert(responces2.size > 1 , s"""
      Before replacement distribution over ${routees1.size} workers: ${responces1}
      After replacement distribution over ${routees2.size} workers: ${responces2}""")
  } 
}


//For each task worker simulate some work for 1 second and sends back to sender worker's id
object Worker {
  var i = 0
  def newId = synchronized {
    i += 1
    i  
  } 
}

class Worker extends Actor {
  val id = Worker.newId
  def receive = {
    case _ => Thread.sleep(1000); sender ! id
  }
}

失败消息

1 was not greater than 1
     Before replacement distribution over 3 workers: Map(2 -> 2, 1 -> 1, 3 -> 2)
     After replacement distribution over 3 workers: Map(4 -> 5)

因此,在将替换任务分配给 3 个工人之前,5 个任务全部分配给了一个工人。 BalancingPool 是否应该以预期的方式处理 AddRoutee/RemoveRoutee 消息?

来自 Patrik Nordwall 在 akka user group 中的回答:

The reason is that the BalancingPool use a special dispatcher when it creates the routees. Here you create and add the routees without that. Try this in instead:

def replaceWorkers(oldRoutees: Seq[Routee]): Unit = {
    pool ! AdjustPoolSize(-numberOfWorkers)
    pool ! AdjustPoolSize(numberOfWorkers)
    Thread.sleep(500) //Give some time to BalancingPool
  }

However, wouldn't it be better to let the routees throw an exception and thereby restart?