如何恢复在处理它自己发送的消息时失败的 akka actor?
How to resume an akka actor that failed while processing a message that it sends itself?
我有以下示例演员代码。
object SomeExternalDep {
private var flag = true
// this function will throw an exception once when called with the value 3, then it won't throw another exception
@throws[RuntimeException]
def potentiallyThrows(curr: Int): Unit = {
if (curr == 3 && flag) {
flag = false
throw new RuntimeException("Something went wrong in external dependency")
}
}
}
class CountingActor(start: Int, end: Int)
extends Actor
with ActorLogging {
var curr: Int = start
// This method counts for us
private def doCount(): Unit = {
// This may throw, which will cause this actor to fail
SomeExternalDep.potentiallyThrows(curr)
// Send self a count message. If the above call exceptions this is never called
if (curr <= end) {
self ! CountingActor.Count(curr)
}
}
override def receive: Receive = {
case CountingActor.Start => doCount()
case CountingActor.Count(n) =>
log.info(s"Counting: $n")
curr += 1
doCount()
case x => log.error(s"bad message $x")
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
log.error(s"CountingActor failed while processing $message")
self ! CountingActor.Start
}
}
object CountingActor {
def props(start: Int, end: Int): Props = Props(new CountingActor(start, end))
case object Start
case class Count(n: Int)
}
class SupervisorActor
extends Actor
with ActorLogging {
var countingActor: ActorRef = _
override val supervisorStrategy: OneForOneStrategy =
OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) {
// case _: RuntimeException => Restart
case _: RuntimeException => Resume
}
private def doStart(): Unit = {
countingActor = context.actorOf(CountingActor.props(0, 5))
countingActor ! CountingActor.Start
}
override def receive: Receive = {
case SupervisorActor.Init => doStart()
case _ => log.error(s"Supervisor doesn't process messages")
}
}
在这里,CountingActor
基本上是向自己发送一条 Count
消息。然后它调用一些可能会失败的外部依赖项。它还在计数时对其内部状态进行了一些更改。我还实现了一个简单的SupervisorActor
。此 actor 创建 CountingActor
作为其子项。
当监管策略设置为Restart
时。我得到了预期的结果。 actor 数到 3,失败,因为它看到异常。 preRestart
挂钩向收件箱发送一条新的 Start
消息,并再次开始计数。
[INFO] [07/10/2019 15:23:59.895] [counting-sys-akka.actor.default-dispatcher-2] [akka://counting-sys/user/$a/$a] Counting: 0
[INFO] [07/10/2019 15:23:59.896] [counting-sys-akka.actor.default-dispatcher-2] [akka://counting-sys/user/$a/$a] Counting: 1
[INFO] [07/10/2019 15:23:59.896] [counting-sys-akka.actor.default-dispatcher-2] [akka://counting-sys/user/$a/$a] Counting: 2
[ERROR] [07/10/2019 15:23:59.905] [counting-sys-akka.actor.default-dispatcher-2] [akka://counting-sys/user/$a/$a] Something went wrong in external dependency
java.lang.RuntimeException: Something went wrong in external dependency
at SomeExternalDep$.potentiallyThrows(ActorSupervisionTest.scala:15)
at CountingActor.CountingActor$$doCount(ActorSupervisionTest.scala:30)
<Stack Trace omitted>
[ERROR] [07/10/2019 15:23:59.909] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] CountingActor failed while processing Some(Count(2))
[INFO] [07/10/2019 15:23:59.912] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 0
[INFO] [07/10/2019 15:23:59.912] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 1
[INFO] [07/10/2019 15:23:59.912] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 2
[INFO] [07/10/2019 15:23:59.912] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 3
[INFO] [07/10/2019 15:23:59.913] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 4
[INFO] [07/10/2019 15:23:59.913] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 5
但是当我把监管策略改成Resume
的时候。 Actor 卡住了,因为它在发送下一条 Count
消息之前失败了。
[INFO] [07/10/2019 15:26:01.779] [counting-sys-akka.actor.default-dispatcher-5] [akka://counting-sys/user/$a/$a] Counting: 0
[INFO] [07/10/2019 15:26:01.780] [counting-sys-akka.actor.default-dispatcher-5] [akka://counting-sys/user/$a/$a] Counting: 1
[INFO] [07/10/2019 15:26:01.780] [counting-sys-akka.actor.default-dispatcher-5] [akka://counting-sys/user/$a/$a] Counting: 2
[WARN] [07/10/2019 15:26:01.786] [counting-sys-akka.actor.default-dispatcher-4] [akka://counting-sys/user/$a/$a] Something went wrong in external dependency
如何解决这个问题,以便在外部依赖项失败时从 3 开始继续计数?
看起来真正开始的代码你的逻辑基本上是一个从 1 到 N 的循环,在每次迭代中,你发送一条消息去下一次迭代,问题是如果抛出异常,你不要发送消息去下一次迭代,这里是监督者做它的工作的地方,重启很简单,因为再次开始循环的代码被执行了,但是如果你恢复流程,消息去下一次永远不会发送迭代。
一个简单的解决方法是更改 doCount
方法的操作顺序,首先将消息发送给自己,然后处理危险操作,这应该适用于 Resume
策略但我会在实际使用这种方法之前测试一些场景,我不知道的是 akka 是否会在 Restart
策略的情况下丢弃邮箱,我相信它不会,这意味着在重新启动 actor 之后,它将收到待处理的消息。
另一种解决方法是在恢复子 actor 后重新发送主管的消息。
编辑:我仔细查看了 akka 源代码,没有明显的方法来捕获恢复事件,实际上有一个内部 Resume
事件但它是 akka 私有的,不会发送给您的实际演员,我认为如果您想使用 Resume
策略,请不要打扰主管,只需捕获演员内部可能出现的异常(基本上模拟恢复策略),这应该会给你预期的行为,而不是处理可能的极端情况。
我有以下示例演员代码。
object SomeExternalDep {
private var flag = true
// this function will throw an exception once when called with the value 3, then it won't throw another exception
@throws[RuntimeException]
def potentiallyThrows(curr: Int): Unit = {
if (curr == 3 && flag) {
flag = false
throw new RuntimeException("Something went wrong in external dependency")
}
}
}
class CountingActor(start: Int, end: Int)
extends Actor
with ActorLogging {
var curr: Int = start
// This method counts for us
private def doCount(): Unit = {
// This may throw, which will cause this actor to fail
SomeExternalDep.potentiallyThrows(curr)
// Send self a count message. If the above call exceptions this is never called
if (curr <= end) {
self ! CountingActor.Count(curr)
}
}
override def receive: Receive = {
case CountingActor.Start => doCount()
case CountingActor.Count(n) =>
log.info(s"Counting: $n")
curr += 1
doCount()
case x => log.error(s"bad message $x")
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
log.error(s"CountingActor failed while processing $message")
self ! CountingActor.Start
}
}
object CountingActor {
def props(start: Int, end: Int): Props = Props(new CountingActor(start, end))
case object Start
case class Count(n: Int)
}
class SupervisorActor
extends Actor
with ActorLogging {
var countingActor: ActorRef = _
override val supervisorStrategy: OneForOneStrategy =
OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) {
// case _: RuntimeException => Restart
case _: RuntimeException => Resume
}
private def doStart(): Unit = {
countingActor = context.actorOf(CountingActor.props(0, 5))
countingActor ! CountingActor.Start
}
override def receive: Receive = {
case SupervisorActor.Init => doStart()
case _ => log.error(s"Supervisor doesn't process messages")
}
}
在这里,CountingActor
基本上是向自己发送一条 Count
消息。然后它调用一些可能会失败的外部依赖项。它还在计数时对其内部状态进行了一些更改。我还实现了一个简单的SupervisorActor
。此 actor 创建 CountingActor
作为其子项。
当监管策略设置为Restart
时。我得到了预期的结果。 actor 数到 3,失败,因为它看到异常。 preRestart
挂钩向收件箱发送一条新的 Start
消息,并再次开始计数。
[INFO] [07/10/2019 15:23:59.895] [counting-sys-akka.actor.default-dispatcher-2] [akka://counting-sys/user/$a/$a] Counting: 0
[INFO] [07/10/2019 15:23:59.896] [counting-sys-akka.actor.default-dispatcher-2] [akka://counting-sys/user/$a/$a] Counting: 1
[INFO] [07/10/2019 15:23:59.896] [counting-sys-akka.actor.default-dispatcher-2] [akka://counting-sys/user/$a/$a] Counting: 2
[ERROR] [07/10/2019 15:23:59.905] [counting-sys-akka.actor.default-dispatcher-2] [akka://counting-sys/user/$a/$a] Something went wrong in external dependency
java.lang.RuntimeException: Something went wrong in external dependency
at SomeExternalDep$.potentiallyThrows(ActorSupervisionTest.scala:15)
at CountingActor.CountingActor$$doCount(ActorSupervisionTest.scala:30)
<Stack Trace omitted>
[ERROR] [07/10/2019 15:23:59.909] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] CountingActor failed while processing Some(Count(2))
[INFO] [07/10/2019 15:23:59.912] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 0
[INFO] [07/10/2019 15:23:59.912] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 1
[INFO] [07/10/2019 15:23:59.912] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 2
[INFO] [07/10/2019 15:23:59.912] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 3
[INFO] [07/10/2019 15:23:59.913] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 4
[INFO] [07/10/2019 15:23:59.913] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 5
但是当我把监管策略改成Resume
的时候。 Actor 卡住了,因为它在发送下一条 Count
消息之前失败了。
[INFO] [07/10/2019 15:26:01.779] [counting-sys-akka.actor.default-dispatcher-5] [akka://counting-sys/user/$a/$a] Counting: 0
[INFO] [07/10/2019 15:26:01.780] [counting-sys-akka.actor.default-dispatcher-5] [akka://counting-sys/user/$a/$a] Counting: 1
[INFO] [07/10/2019 15:26:01.780] [counting-sys-akka.actor.default-dispatcher-5] [akka://counting-sys/user/$a/$a] Counting: 2
[WARN] [07/10/2019 15:26:01.786] [counting-sys-akka.actor.default-dispatcher-4] [akka://counting-sys/user/$a/$a] Something went wrong in external dependency
如何解决这个问题,以便在外部依赖项失败时从 3 开始继续计数?
看起来真正开始的代码你的逻辑基本上是一个从 1 到 N 的循环,在每次迭代中,你发送一条消息去下一次迭代,问题是如果抛出异常,你不要发送消息去下一次迭代,这里是监督者做它的工作的地方,重启很简单,因为再次开始循环的代码被执行了,但是如果你恢复流程,消息去下一次永远不会发送迭代。
一个简单的解决方法是更改 doCount
方法的操作顺序,首先将消息发送给自己,然后处理危险操作,这应该适用于 Resume
策略但我会在实际使用这种方法之前测试一些场景,我不知道的是 akka 是否会在 Restart
策略的情况下丢弃邮箱,我相信它不会,这意味着在重新启动 actor 之后,它将收到待处理的消息。
另一种解决方法是在恢复子 actor 后重新发送主管的消息。
编辑:我仔细查看了 akka 源代码,没有明显的方法来捕获恢复事件,实际上有一个内部 Resume
事件但它是 akka 私有的,不会发送给您的实际演员,我认为如果您想使用 Resume
策略,请不要打扰主管,只需捕获演员内部可能出现的异常(基本上模拟恢复策略),这应该会给你预期的行为,而不是处理可能的极端情况。