在 actors 的 preStart 中处理异常
Handling exceptions in preStart in actors
我有一个服务有主管负责构建子 actor 并处理他们的异常。
ServiceMain -> Supervisor -> DiscoveryActor
要构建 DiscoveryActor
我调用以下内容
Await.result(supervisor ? (Props[Discovery], "Discovery"), Duration.create(60, SECONDS)) match {
case actor: ActorRef =>
discoveryActor = actor
case ex: Exception =>
logger.error("Failed to initialize discovery actor", ex)
sys.exit(1)
}
在 Supervisor
中由这段代码处理
def receive = {
case p: Props => sender() ! context.actorOf(p)
case (p: Props, s: String) => sender() ! context.actorOf(p, s)
}
如果 DiscoveryActor
无法按配置研究主机,则应在 preStart()
中抛出异常。异常ApiConnectionException
被actor抛出,被akka捕获,变成了ActorInitializationException
.
我已经厌倦了在 Await 中捕获这个异常并使用如下的 supervisorStrategy
override val supervisorStrategy =
AllForOneStrategy() {
case _: Exception => Escalate
}
但是这些都没有成功捕获它,我试图做的是捕获这样的异常并退出应用程序。
如果有人能指出我哪里出错或遗漏了什么,我将不胜感激!
我稍微简化了您的代码,以便直接找到问题的根源。您可以将其复制并粘贴到您的编辑器中。它使用 ScalaTest 套件。
SupervisorStrategy
在 Supervisor
actor 中定义 确实 捕获 Discovery
actor 在 preStart
方法中抛出的异常。您可能想仔细查看您自己的代码。
您的 Await
块正在尝试捕获异常,但在这种情况下是不可能的。 Exception
由 Discovery
参与者抛出,而不是作为消息发送。不过,您使用的询问模式 (?) 只是等待消息到达。只有使用 SupervisorStrategy
才能让您返回抛出的异常。除了在 Supervisor
中升级异常之外,您还可以向应用程序 Guardian actor 发送一条消息,说明初始化失败,因此应用程序应该退出。或者直接在 Supervisor
中进行。
import java.util.concurrent.TimeUnit
import akka.actor.SupervisorStrategy.Escalate
import akka.actor._
import akka.pattern.ask
import akka.testkit.{ImplicitSender, TestKit}
import akka.util.Timeout
import org.scalatest.{BeforeAndAfterAll, FunSuiteLike, Matchers}
import scala.concurrent.Await
abstract class ActorSuite(systemName: String)
extends TestKit(ActorSystem(systemName))
with FunSuiteLike
with ImplicitSender
with Matchers
with BeforeAndAfterAll {
override def afterAll {
TestKit.shutdownActorSystem(system)
}
}
class FailingActorInitializationSuite extends ActorSuite("failing-system") {
test("run it") {
val supervisor = system.actorOf(Props[Supervisor])
var discoveryActor: ActorRef = null
implicit val timeout = Timeout(60, TimeUnit.SECONDS)
Await.result(
supervisor ?(Props[Discovery], "Discovery"), timeout.duration) match {
case actor: ActorRef =>
discoveryActor = actor
}
}
}
class Supervisor extends Actor with ActorLogging {
override val supervisorStrategy =
AllForOneStrategy() {
case e: Exception =>
log.error(s"Caught an exception [${e.getCause.getMessage}] and escalating")
Escalate
}
override def receive: Receive = {
case (p: Props, s: String) => sender() ! context.actorOf(p, s)
}
}
class Discovery extends Actor {
override def preStart(): Unit = {
super.preStart()
throw new RuntimeException("Can't create")
}
override def receive: Actor.Receive = {
case _ =>
}
}
我有一个服务有主管负责构建子 actor 并处理他们的异常。
ServiceMain -> Supervisor -> DiscoveryActor
要构建 DiscoveryActor
我调用以下内容
Await.result(supervisor ? (Props[Discovery], "Discovery"), Duration.create(60, SECONDS)) match {
case actor: ActorRef =>
discoveryActor = actor
case ex: Exception =>
logger.error("Failed to initialize discovery actor", ex)
sys.exit(1)
}
在 Supervisor
def receive = {
case p: Props => sender() ! context.actorOf(p)
case (p: Props, s: String) => sender() ! context.actorOf(p, s)
}
如果 DiscoveryActor
无法按配置研究主机,则应在 preStart()
中抛出异常。异常ApiConnectionException
被actor抛出,被akka捕获,变成了ActorInitializationException
.
我已经厌倦了在 Await 中捕获这个异常并使用如下的 supervisorStrategy
override val supervisorStrategy =
AllForOneStrategy() {
case _: Exception => Escalate
}
但是这些都没有成功捕获它,我试图做的是捕获这样的异常并退出应用程序。
如果有人能指出我哪里出错或遗漏了什么,我将不胜感激!
我稍微简化了您的代码,以便直接找到问题的根源。您可以将其复制并粘贴到您的编辑器中。它使用 ScalaTest 套件。
SupervisorStrategy
在 Supervisor
actor 中定义 确实 捕获 Discovery
actor 在 preStart
方法中抛出的异常。您可能想仔细查看您自己的代码。
您的 Await
块正在尝试捕获异常,但在这种情况下是不可能的。 Exception
由 Discovery
参与者抛出,而不是作为消息发送。不过,您使用的询问模式 (?) 只是等待消息到达。只有使用 SupervisorStrategy
才能让您返回抛出的异常。除了在 Supervisor
中升级异常之外,您还可以向应用程序 Guardian actor 发送一条消息,说明初始化失败,因此应用程序应该退出。或者直接在 Supervisor
中进行。
import java.util.concurrent.TimeUnit
import akka.actor.SupervisorStrategy.Escalate
import akka.actor._
import akka.pattern.ask
import akka.testkit.{ImplicitSender, TestKit}
import akka.util.Timeout
import org.scalatest.{BeforeAndAfterAll, FunSuiteLike, Matchers}
import scala.concurrent.Await
abstract class ActorSuite(systemName: String)
extends TestKit(ActorSystem(systemName))
with FunSuiteLike
with ImplicitSender
with Matchers
with BeforeAndAfterAll {
override def afterAll {
TestKit.shutdownActorSystem(system)
}
}
class FailingActorInitializationSuite extends ActorSuite("failing-system") {
test("run it") {
val supervisor = system.actorOf(Props[Supervisor])
var discoveryActor: ActorRef = null
implicit val timeout = Timeout(60, TimeUnit.SECONDS)
Await.result(
supervisor ?(Props[Discovery], "Discovery"), timeout.duration) match {
case actor: ActorRef =>
discoveryActor = actor
}
}
}
class Supervisor extends Actor with ActorLogging {
override val supervisorStrategy =
AllForOneStrategy() {
case e: Exception =>
log.error(s"Caught an exception [${e.getCause.getMessage}] and escalating")
Escalate
}
override def receive: Receive = {
case (p: Props, s: String) => sender() ! context.actorOf(p, s)
}
}
class Discovery extends Actor {
override def preStart(): Unit = {
super.preStart()
throw new RuntimeException("Can't create")
}
override def receive: Actor.Receive = {
case _ =>
}
}