在 maxNrOfRetries 之后对 SupervisorStrategy 应用自定义操作?

Applying custom action on SupervisorStrategy after maxNrOfRetries?

我的Parent演员长得像

case object StartRemoteProcessor

class ConnectorActor extends Actor with ActorLogging {
  override def supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 20 seconds) {
    case e: OutOfMemoryError =>
      log.error("Exception received => " + e.getMessage)
      Restart
    case e: IllegalArgumentException =>
      log.error("Exception received => " + e.getMessage)
      Restart
  }

  def receive = LoggingReceive {
    case StartRemoteProcessor =>
      val remoteProcessor = context.actorOf(Props[ProcessingActor], "processingActor")
      log.info("Starting Remote Processor")
      remoteProcessor ! "Start"
    case "ProcessingStopped" =>
      notifyFailure()
  }

  def notifyFailure() = {
    log.info("notifying failure to server")
  }
}

根据docs

The child actor is stopped if the limit is exceeded.

要求

在我的 Child Actor 我有

class ProcessingActor extends Actor with ActorLogging {

  override def aroundPostRestart(reason: Throwable): Unit = self.tell("Start", context.parent)
  override def preStart(): Unit = ()
  override def postStop(): Unit = context.parent ! "ProcessingStopped"

  def receive = LoggingReceive {
    case "Start" =>
      log.info("ProcessingActor path => " + self.path)
      startProcessing()
  }

  def startProcessing() = {
    println("executing startProcessing")
    throw new IllegalArgumentException("not implemented by choice")
  }
}

但是在日志中,我看到每个 Restart

都会调用 notifyFailure
[INFO] [07/24/2015 11:57:50.107] [main] [Remoting] Starting remoting
[INFO] [07/24/2015 11:57:50.265] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://ConnectorSystem@127.0.0.1:2554]
[INFO] [07/24/2015 11:57:50.266] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://ConnectorSystem@127.0.0.1:2554]
ConnectorSystem Started
[INFO] [07/24/2015 11:57:50.277] [ConnectorSystem-akka.actor.default-dispatcher-3] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] Starting Remote Processor
[ERROR] [07/24/2015 11:57:50.509] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] Exception received => not implemented by choice
[ERROR] [07/24/2015 11:57:50.511] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ProcessingSystem@127.0.0.1:2552/remote/akka.tcp/ConnectorSystem@127.0.0.1:2554/user/connectorActor/processingActor] not implemented by choice
java.lang.IllegalArgumentException: not implemented by choice
    at com.learn.remote.processing.ProcessingActor.startProcessing(ProcessingActor.scala:23)
    at com.learn.remote.processing.ProcessingActor$$anonfun$receive.applyOrElse(ProcessingActor.scala:18)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
    at com.learn.remote.processing.ProcessingActor.aroundReceive(ProcessingActor.scala:7)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[INFO] [07/24/2015 11:57:50.526] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] notifying failure to server
[ERROR] [07/24/2015 11:57:50.528] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] Exception received => not implemented by choice
[ERROR] [07/24/2015 11:57:50.528] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ProcessingSystem@127.0.0.1:2552/remote/akka.tcp/ConnectorSystem@127.0.0.1:2554/user/connectorActor/processingActor] not implemented by choice
java.lang.IllegalArgumentException: not implemented by choice
    at com.learn.remote.processing.ProcessingActor.startProcessing(ProcessingActor.scala:23)
    at com.learn.remote.processing.ProcessingActor$$anonfun$receive.applyOrElse(ProcessingActor.scala:18)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
    at com.learn.remote.processing.ProcessingActor.aroundReceive(ProcessingActor.scala:7)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[INFO] [07/24/2015 11:57:50.533] [ConnectorSystem-akka.actor.default-dispatcher-3] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] notifying failure to server
[ERROR] [07/24/2015 11:57:50.534] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] Exception received => not implemented by choice
[ERROR] [07/24/2015 11:57:50.534] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ProcessingSystem@127.0.0.1:2552/remote/akka.tcp/ConnectorSystem@127.0.0.1:2554/user/connectorActor/processingActor] not implemented by choice
java.lang.IllegalArgumentException: not implemented by choice
    at com.learn.remote.processing.ProcessingActor.startProcessing(ProcessingActor.scala:23)
    at com.learn.remote.processing.ProcessingActor$$anonfun$receive.applyOrElse(ProcessingActor.scala:18)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
    at com.learn.remote.processing.ProcessingActor.aroundReceive(ProcessingActor.scala:7)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[INFO] [07/24/2015 11:57:50.538] [ConnectorSystem-akka.actor.default-dispatcher-3] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] notifying failure to server
[ERROR] [07/24/2015 11:57:50.540] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] Exception received => not implemented by choice
[ERROR] [07/24/2015 11:57:50.540] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ProcessingSystem@127.0.0.1:2552/remote/akka.tcp/ConnectorSystem@127.0.0.1:2554/user/connectorActor/processingActor] not implemented by choice
java.lang.IllegalArgumentException: not implemented by choice
    at com.learn.remote.processing.ProcessingActor.startProcessing(ProcessingActor.scala:23)
    at com.learn.remote.processing.ProcessingActor$$anonfun$receive.applyOrElse(ProcessingActor.scala:18)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
    at com.learn.remote.processing.ProcessingActor.aroundReceive(ProcessingActor.scala:7)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[INFO] [07/24/2015 11:57:50.545] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] notifying failure to server

我怎样才能实现这种行为?

正如@rkuhn 在 gitter chat 上的推荐,以下对我有用

override def preRestart(reason: Throwable, message: Option[Any]): Unit = ()

所有代码?

  override def aroundPostRestart(reason: Throwable): Unit = self.tell("Start", context.parent)
  override def preRestart(reason: Throwable, message: Option[Any]): Unit = ()
  override def postStop(): Unit = context.parent ! "ProcessingStopped"

daydreamer 的答案会起作用,但另一种方法可能是从父级观看子演员,当您收到终止消息时,执行 notifyFailure

var remoteProcessor:ActorRef = _

def receive = LoggingReceive {
  case StartRemoteProcessor =>
    remoteProcessor = context.actorOf(Props[ProcessingActor], "processingActor")
    context.watch(remoteProcessor)
    log.info("Starting Remote Processor")
    remoteProcessor ! "Start"

  case Terminated(remoteProcessor) =>
    notifyFailure()
}

这样您就不需要自定义 Actor 生命周期方法,我发现这可能是错误的丰富来源。