为长寿的 akka actor 重试次要异常

Retry on minor Exceptions for a long-living akka actor

我有一个 actor,它是在应用程序启动时作为另一个 actor 的子项创建的,并且每天从父项接收一次消息以执行从某个 SFTP 服务器获取一些文件的操作。

现在,可能会有一些轻微的临时连接异常导致操作失败。在这种情况下,需要重试。

但可能会出现抛出异常且重试时无法解决的情况(例如:未找到文件、某些配置不正确等)

因此,在这种情况下,考虑到参与者将在很长的间隔(每天一次)后收到消息,什么可能是合适的重试机制和监督策略。

在这种情况下,发送给参与者的消息并不是错误的输入——它只是一个触发器。示例:

case object FileFetch

如果我在父级中有这样的监督策略,它将在每个 minor/major 异常时重新启动失败的子级而不重试。

override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = -1, withinTimeRange = Duration.inf) {
    case _: Exception                => Restart
}

我想要的是这样的:

override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = -1, withinTimeRange = Duration.inf) {
    case _: MinorException           => Retry same message 2, 3 times and then Restart
    case _: Exception                => Restart
}

"Retrying" 或在异常情况下重新发送消息是您必须自己实现的。来自 documentation:

If an exception is thrown while a message is being processed (i.e. taken out of its mailbox and handed over to the current behavior), then this message will be lost. It is important to understand that it is not put back on the mailbox. So if you want to retry processing of a message, you need to deal with it yourself by catching the exception and retry[ing] your flow. Make sure that you put a bound on the number of retries since you don’t want a system to livelock (so consuming a lot of cpu cycles without making progress).

如果您想在 MinorException 的情况下将 FileFetch 消息重新发送到 child 而无需重新启动 child,那么您可以捕获异常child避免触发监管策略。在 try-catch 块中,您可以向 parent 发送一条消息并让 parent 跟踪重试次数(如果您希望 parent 来制定某种退避政策,例如)。在 child:

def receive = {
  case FileFetch =>
    try {
      ...
    } catch {
      case m: MinorException =>
        val now = System.nanoTime
        context.parent ! MinorIncident(self, now)
    }
  case ...
} 

在parent:

override val supervisorStrategy =
  OneForOneStrategy(maxNrOfRetries = -1, withinTimeRange = Duration.Inf) {
    case _: Exception => Restart
  }

var numFetchRetries = 0

def receive = {
  case MinorIncident(fetcherRef, time) =>
    log.error(s"${fetcherRef} threw a MinorException at ${time}")
    if (numFetchRetries < 3) { // possibly use the time in the retry logic; e.g., a backoff
      numFetchRetries = numFetchRetries + 1
      fetcherRef ! FileFetch
    } else {
      numFetchRetries = 0
      context.stop(fetcherRef)
      ... // recreate the child
    }
  case SomeMsgFromChildThatFetchSucceeded =>
    numFetchRetries = 0
  case ...
}

或者,您可以在 MinorException 事件中将监督策略设置为 Resume child,而不是捕获 child 中的异常,而仍然让 parent 处理消息重试逻辑:

override val supervisorStrategy =
  OneForOneStrategy(maxNrOfRetries = -1, withinTimeRange = Duration.Inf) {
    case m: MinorException =>
      val child = sender()
      val now = System.nanoTime
      self ! MinorIncident(child, now)
      Resume
    case _: Exception => Restart
  }