从 Supervisor 重启后向 actor 发送消息

Send message to actor after restart from Supervisor

我正在使用 BackoffSupervisor 策略来创建一个必须处理某些消息的 child actor。我想实现一个非常简单的重启策略,在异常情况下:

  1. Child 将失败消息传播给主管
  2. 主管重新启动child并再次发送失败的消息。

  3. 主管重试3次后放弃

  4. Akka 持久性不是一个选项

到目前为止我得到的是:

主管定义:

val childProps = Props(new SenderActor())
val supervisor = BackoffSupervisor.props(
  Backoff.onFailure(
    childProps,
    childName = cmd.hashCode.toString,
    minBackoff = 1.seconds,
    maxBackoff = 2.seconds,
    randomFactor = 0.2 
  )
    .withSupervisorStrategy(
      OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
        case msg: MessageException => {
          println("caught specific message!")
          SupervisorStrategy.Restart
        }
        case _: Exception => SupervisorStrategy.Restart
        case _              ⇒ SupervisorStrategy.Escalate
      })
)

val sup = context.actorOf(supervisor)


sup ! cmd

Child 本应发送 e-mail 但失败(抛出一些异常)并将异常传播回主管的参与者:

class SenderActor() extends Actor {

  def fakeSendMail():Unit =  {
    Thread.sleep(1000)
    throw new Exception("surprising exception")
  } 

  override def receive: Receive = {
    case cmd: NewMail =>

      println("new mail received routee")
      try {
        fakeSendMail()
      } catch {
        case t => throw MessageException(cmd, t)
      }

  }
}

在上面的代码中,我将任何异常包装到自定义的 class MessageException 中,它会传播到 SupervisorStrategy,但是如何将它进一步传播到新的 child 以强制重新处理?这是正确的方法吗?

编辑。我试图在 preRestart 挂钩上向 Actor 重新发送消息,但不知何故没有触发挂钩:

class SenderActor() extends Actor {

  def fakeSendMail():Unit =  {
    Thread.sleep(1000)
    //    println("mail sent!")
    throw new Exception("surprising exception")
  }

  override def preStart(): Unit = {
    println("child starting")
  }


  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    reason match {
      case m: MessageException => {
        println("aaaaa")
        message.foreach(self ! _)
      }
      case _ => println("bbbb")
    }
  }

  override def postStop(): Unit = {
    println("child stopping")
  }

  override def receive: Receive = {
    case cmd: NewMail =>

      println("new mail received routee")
      try {
        fakeSendMail()
      } catch {
        case t => throw MessageException(cmd, t)
      }

  }
}

这给了我类似于以下输出的内容:

new mail received routee
caught specific message!
child stopping
[ERROR] [01/26/2018 10:15:35.690]
[example-akka.actor.default-dispatcher-2]
[akka://example/user/persistentActor-4-scala/$a/1962829645] Could not
process message sample.persistence.MessageException:
Could not process message <stacktrace>
child starting

但是没有来自 preRestart 钩子

的日志

失败的子演员在您的主管策略中可用作发件人。引用 https://doc.akka.io/docs/akka/current/fault-tolerance.html#creating-a-supervisor-strategy:

If the strategy is declared inside the supervising actor (as opposed to within a companion object) its decider has access to all internal state of the actor in a thread-safe fashion, including obtaining a reference to the currently failed child (available as the sender of the failure message).

在您的情况下,使用某些第三方软件发送电子邮件是一项危险的操作。为什么不应用 Circuit Breaker 模式并完全跳过发送者角色?此外,您仍然可以在其中放置一个 actor(带有一些 Backoff Supervisor)和 Circuit Breaker(如果这对您有意义的话)。

之所以没有调用child的preRestart钩子是因为Backoff.onFailure使用BackoffOnRestartSupervisor underneath the covers, which replaces the default restart behavior with a stop-and-delayed-start behavior that is consistent with the backoff policy. In other words, when using Backoff.onFailure, when a child is restarted, the child's preRestart method is not called because the underlying supervisor actually stops the child, then starts it again later. (Using Backoff.onStop可以触发child的preRestart hook,但这与当前的讨论无关。)

BackoffSupervisor API 不支持在主管 child 重启时自动重新发送消息:您必须自己实现此行为。重试消息的一个想法是让 BackoffSupervisor 的主管处理它。例如:

val supervisor = BackoffSupervisor.props(
  Backoff.onFailure(
    ...
  ).withReplyWhileStopped(ChildIsStopped)
  ).withSupervisorStrategy(
    OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
      case msg: MessageException =>
        println("caught specific message!")
        self ! Error(msg.cmd) // replace cmd with whatever the property name is
        SupervisorStrategy.Restart
      case ...
    })
)

val sup = context.actorOf(supervisor)

def receive = {
  case cmd: NewMail =>
    sup ! cmd
  case Error(cmd) =>
    timers.startSingleTimer(cmd.id, Replay(cmd), 10.seconds)
    // We assume that NewMail has an id field. Also, adjust the time as needed.
  case Replay(cmd) =>
    sup ! cmd
  case ChildIsStopped =>
    println("child is stopped")
}

在上面的代码中,嵌入在MessageException中的NewMail消息被包装在自定义大小写class中(为了方便区分它和"normal" /new NewMail 消息)并发送到 self。在此上下文中,self 是创建 BackoffSupervisor 的参与者。然后,这个封闭的 actor 使用 single timer 在某个时候重播原始消息。这个时间点在未来应该足够远,这样 BackoffSupervisor 可能会耗尽 SenderActor 的重启尝试,这样 child 就有足够的机会进入 "good" 收到重发消息前的状态。显然,这个例子只涉及一条消息的重发,而不管 child 次重启。


另一个想法是为每个 NewMail 消息创建一个 BackoffSupervisor-SenderActor 对,并让 SenderActorNewMail 消息发送给自己在 preStart 挂钩中。这种方法的一个问题是清理资源;即,当处理成功或 child 重启耗尽时,关闭 BackoffSupervisors(这将依次关闭它们各自的 SenderActor children)。 NewMail ids 到 (ActorRef, Int) 元组的映射(其中 ActorRef 是对 BackoffSupervisor actor 的引用, Int 是重启尝试的次数) 在这种情况下会有帮助:

class Overlord extends Actor {

  var state = Map[Long, (ActorRef, Int)]() // assuming the mail id is a Long

  def receive = {
    case cmd: NewMail =>
      val childProps = Props(new SenderActor(cmd, self))
      val supervisor = BackoffSupervisor.props(
        Backoff.onFailure(
          ...
        ).withSupervisorStrategy(
          OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
            case msg: MessageException =>
              println("caught specific message!")
              self ! Error(msg.cmd)
              SupervisorStrategy.Restart
            case ...
          })
      )
      val sup = context.actorOf(supervisor)
      state += (cmd.id -> (sup, 0))

    case ProcessingDone(cmdId) =>
      state.get(cmdId) match {
        case Some((backoffSup, _)) =>
          context.stop(backoffSup)
          state -= cmdId
        case None =>
          println(s"${cmdId} not found")
      }

    case Error(cmd) =>
       val cmdId = cmd.id
       state.get(cmdId) match {
         case Some((backoffSup, numRetries)) =>
           if (numRetries == 3) {
             println(s"${cmdId} has already been retried 3 times. Giving up.")
             context.stop(backoffSup)
             state -= cmdId
           } else
             state += (cmdId -> (backoffSup, numRetries + 1))
         case None =>
           println(s"${cmdId} not found")
       }

    case ...
  }
}

请注意,上例中的 SenderActorNewMailActorRef 作为构造函数参数。后一个参数允许 SenderActor 发送自定义 ProcessingDone 消息给封闭的 actor:

class SenderActor(cmd: NewMail, target: ActorRef) extends Actor {
  override def preStart(): Unit = {
    println(s"child starting, sending ${cmd} to self")
    self ! cmd
  }

  def fakeSendMail(): Unit = ...

  def receive = {
    case cmd: NewMail => ...
  }
}

很明显,SenderActor 被设置为在 fakeSendMail 的当前实现中每次都会失败。我将在 SenderActor 中保留实施快乐路径所需的额外更改,其中 SenderActortarget 发送一条 ProcessingDone 消息给您。

在@chunjef 提供的好的解决方案中,他警告在退避主管启动工作程序之前安排作业重新发送的风险

This enclosing actor then uses a single timer to replay the original message at some point. This point in time should be far enough in the future such that the BackoffSupervisor can potentially exhaust SenderActor's restart attempts, so that the child can have ample opportunity to get in a "good" state before it receives the resent message.

如果发生这种情况,情况将是工作变成死信,并且不会有进一步的进展。 我用 this scenario.

做了一个简化的 fiddle

因此,计划延迟应大于 maxBackoff,这可能会影响作业完成时间。 避免这种情况的一种可能解决方案是让工人演员在准备工作时向他的父亲发送消息,例如 here.