在 Akka 中等待一组 Actor 响应的好方法是什么?

What's a good way in Akka to wait for a group of Actors to respond?

在 Akka 中,我想向集群中的参与者发送 "status" 消息以了解他们的状态。这些演员可能处于各种健康状态,包括 dead/unable 响应。

我想等待一段时间,比如 10 秒,然后继续处理我在该时限内碰巧收到的任何结果。我不想让整个事情失败,因为 1 或 2 有问题并且在 10 秒内没有 responded/timed-out。

我试过这个:

object GetStats {
  def unapply(n: ActorRef )(implicit system: ActorSystem): Option[Future[Any]] = Try {
    implicit val t: Timeout = Timeout(10 seconds)
    n ? "A" 
  }.toOption
}
...
val z = List(a,b,c,d)  // where a-d are ActorRefs to nodes I want to status
val q = z.collect {
   case GetStats(s) => s
}
// OK, so here 'q' is a List[Future[Any]]
val allInverted = Future.sequence(q) // now we have Future[List[Any]]
val ok =  Await.result(allInverted, 10 seconds).asInstanceOf[List[String]]
println(ok)

此代码的问题在于,如果 1 个或多个不响应,它似乎会抛出 TimeoutException。然后我无法获得返回的回复。

假设,您确实需要每 10 秒收集至少部分统计信息 - 解决方案是将 "not responding" 转换为实际失败。

为了实现这一点,与 implicit val t:Timeout 相比,只需稍微增加 Await 超时。之后你的期货本身(从 ? 返回)将更早失败。所以你可以 recover 他们:

// Works only when AskTimeout >> AwaitTimeout
val qfiltered = q.map(_.map(Some(_)).recover{case _ => None}) //it's better to match TimeoutException here instead of `_`
val allInverted = Future.sequence(q).map(_.flatten)

示例:

scala> class MyActor extends Actor{ def receive = {case 1 => sender ! 2; case _ =>}}
defined class MyActor

scala> val a = sys.actorOf(Props[MyActor])
a: akka.actor.ActorRef = Actor[akka://1/user/$c#1361310022]

scala> implicit val t: Timeout = Timeout(1 seconds)
t: akka.util.Timeout = Timeout(1 second)

scala> val l = List(a ? 1, a ? 100500).map(_.map(Some(_)).recover{case _ => None})
l: List[scala.concurrent.Future[Option[Any]]] = List(scala.concurrent.impl.Promise$DefaultPromise@7faaa183, scala.concurrent.impl.Promise$DefaultPromise@1b51e0f0)

scala> Await.result(Future.sequence(l).map(_.flatten), 3 seconds)
warning: there were 1 feature warning(s); re-run with -feature for details
res29: List[Any] = List(2)

如果您想知道哪个 Future 没有响应 - 删除 flatten

接收部分响应应该足以持续收集统计信息,就好像某个工作人员没有及时响应一样——下次它会用实际数据响应并且不会丢失任何数据。但是您应该正确地处理 worker 的生命周期,而不是丢失(如果重要的话)actor 本身内部的任何数据。

如果超时的原因只是系统压力大-您可以考虑:

  • 工人的独立游泳池
  • 背压
  • 缓存输入请求(当系统过载时)。

如果此类超时的原因是某些远程存储 - 如果客户端已准备好,则部分响应是处理它的正确方法。例如,WebUI 可能会警告用户显示的数据可能未满,使用一些旋转的东西。但是不要忘记不要用存储请求阻止 actor(futures 可能会有所帮助)或者至少将它们移动到单独的线程池中。

如果 worker actor 由于失败(如异常)而没有响应 - 你仍然可以从你的 preRestart 向发件人发送通知 - 这样你也可以收到没有来自 worker 的统计信息的原因。这里唯一的事情 - 你应该检查发件人是否可用 (it may not be)

P.S。我希望您不要在某些 actor 中执行 Await.result - 至少为了您的应用程序性能,不建议阻止 actor。在某些情况下,它甚至可能导致死锁或内存泄漏。所以 await 应该放在你系统的外观中的某个地方(如果底层框架不支持期货)。

因此异步处理您的答案可能有意义(如果某些参与者没有响应,您仍然需要从失败中恢复它们):

 //actor:
 val parent = sender
 for(list <- Future.sequence(qfiltered)) {
     parent ! process(list)
 }

 //in sender (outside of the actors):
 Await(actor ? Get, 10 seconds)