在 actors 的 preStart 中处理异常

Handling exceptions in preStart in actors

我有一个服务有主管负责构建子 actor 并处理他们的异常。

ServiceMain -> Supervisor -> DiscoveryActor

要构建 DiscoveryActor 我调用以下内容

Await.result(supervisor ? (Props[Discovery], "Discovery"), Duration.create(60, SECONDS)) match {
    case actor: ActorRef =>
      discoveryActor = actor

    case ex: Exception =>
      logger.error("Failed to initialize discovery actor", ex)
      sys.exit(1)
}

Supervisor

中由这段代码处理
def receive = {
  case p: Props => sender() ! context.actorOf(p)

  case (p: Props, s: String) => sender() ! context.actorOf(p, s)
}

如果 DiscoveryActor 无法按配置研究主机,则应在 preStart() 中抛出异常。异常ApiConnectionException被actor抛出,被akka捕获,变成了ActorInitializationException.

我已经厌倦了在 Await 中捕获这个异常并使用如下的 supervisorStrategy

override val supervisorStrategy =
  AllForOneStrategy() {
    case _: Exception                =>       Escalate
  }

但是这些都没有成功捕获它,我试图做的是捕获这样的异常并退出应用程序。

如果有人能指出我哪里出错或遗漏了什么,我将不胜感激!

我稍微简化了您的代码,以便直接找到问题的根源。您可以将其复制并粘贴到您的编辑器中。它使用 ScalaTest 套件。

SupervisorStrategySupervisor actor 中定义 确实 捕获 Discovery actor 在 preStart 方法中抛出的异常。您可能想仔细查看您自己的代码。

您的 Await 块正在尝试捕获异常,但在这种情况下是不可能的。 ExceptionDiscovery 参与者抛出,而不是作为消息发送。不过,您使用的询问模式 (?) 只是等待消息到达。只有使用 SupervisorStrategy 才能让您返回抛出的异常。除了在 Supervisor 中升级异常之外,您还可以向应用程序 Guardian actor 发送一条消息,说明初始化失败,因此应用程序应该退出。或者直接在 Supervisor 中进行。

import java.util.concurrent.TimeUnit    
import akka.actor.SupervisorStrategy.Escalate
import akka.actor._
import akka.pattern.ask
import akka.testkit.{ImplicitSender, TestKit}
import akka.util.Timeout
import org.scalatest.{BeforeAndAfterAll, FunSuiteLike, Matchers}

import scala.concurrent.Await

abstract class ActorSuite(systemName: String)
  extends TestKit(ActorSystem(systemName))
  with FunSuiteLike
  with ImplicitSender
  with Matchers
  with BeforeAndAfterAll {

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }
}

class FailingActorInitializationSuite extends ActorSuite("failing-system") {


  test("run it") {

    val supervisor = system.actorOf(Props[Supervisor])
    var discoveryActor: ActorRef = null

    implicit val timeout = Timeout(60, TimeUnit.SECONDS)

    Await.result(
      supervisor ?(Props[Discovery], "Discovery"), timeout.duration) match {
      case actor: ActorRef =>
        discoveryActor = actor
    }
  }
}

class Supervisor extends Actor with ActorLogging {

  override val supervisorStrategy =
    AllForOneStrategy() {
      case e: Exception =>
        log.error(s"Caught an exception [${e.getCause.getMessage}] and escalating")
        Escalate
    }

  override def receive: Receive = {
    case (p: Props, s: String) => sender() ! context.actorOf(p, s)
  }
}

class Discovery extends Actor {

  override def preStart(): Unit = {
    super.preStart()
    throw new RuntimeException("Can't create")
  }

  override def receive: Actor.Receive = {
    case _ =>
  }
}