在 maxNrOfRetries 之后对 SupervisorStrategy 应用自定义操作?
Applying custom action on SupervisorStrategy after maxNrOfRetries?
我的Parent演员长得像
case object StartRemoteProcessor
class ConnectorActor extends Actor with ActorLogging {
override def supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 20 seconds) {
case e: OutOfMemoryError =>
log.error("Exception received => " + e.getMessage)
Restart
case e: IllegalArgumentException =>
log.error("Exception received => " + e.getMessage)
Restart
}
def receive = LoggingReceive {
case StartRemoteProcessor =>
val remoteProcessor = context.actorOf(Props[ProcessingActor], "processingActor")
log.info("Starting Remote Processor")
remoteProcessor ! "Start"
case "ProcessingStopped" =>
notifyFailure()
}
def notifyFailure() = {
log.info("notifying failure to server")
}
}
根据docs
The child actor is stopped if the limit is exceeded.
要求
- 一旦
maxNrOfRetries
耗尽,我想在 actor 最终停止时执行自定义操作 notifyFailure
。这将发送电子邮件
在我的 Child Actor
我有
class ProcessingActor extends Actor with ActorLogging {
override def aroundPostRestart(reason: Throwable): Unit = self.tell("Start", context.parent)
override def preStart(): Unit = ()
override def postStop(): Unit = context.parent ! "ProcessingStopped"
def receive = LoggingReceive {
case "Start" =>
log.info("ProcessingActor path => " + self.path)
startProcessing()
}
def startProcessing() = {
println("executing startProcessing")
throw new IllegalArgumentException("not implemented by choice")
}
}
但是在日志中,我看到每个 Restart
都会调用 notifyFailure
[INFO] [07/24/2015 11:57:50.107] [main] [Remoting] Starting remoting
[INFO] [07/24/2015 11:57:50.265] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://ConnectorSystem@127.0.0.1:2554]
[INFO] [07/24/2015 11:57:50.266] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://ConnectorSystem@127.0.0.1:2554]
ConnectorSystem Started
[INFO] [07/24/2015 11:57:50.277] [ConnectorSystem-akka.actor.default-dispatcher-3] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] Starting Remote Processor
[ERROR] [07/24/2015 11:57:50.509] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] Exception received => not implemented by choice
[ERROR] [07/24/2015 11:57:50.511] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ProcessingSystem@127.0.0.1:2552/remote/akka.tcp/ConnectorSystem@127.0.0.1:2554/user/connectorActor/processingActor] not implemented by choice
java.lang.IllegalArgumentException: not implemented by choice
at com.learn.remote.processing.ProcessingActor.startProcessing(ProcessingActor.scala:23)
at com.learn.remote.processing.ProcessingActor$$anonfun$receive.applyOrElse(ProcessingActor.scala:18)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at com.learn.remote.processing.ProcessingActor.aroundReceive(ProcessingActor.scala:7)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[INFO] [07/24/2015 11:57:50.526] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] notifying failure to server
[ERROR] [07/24/2015 11:57:50.528] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] Exception received => not implemented by choice
[ERROR] [07/24/2015 11:57:50.528] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ProcessingSystem@127.0.0.1:2552/remote/akka.tcp/ConnectorSystem@127.0.0.1:2554/user/connectorActor/processingActor] not implemented by choice
java.lang.IllegalArgumentException: not implemented by choice
at com.learn.remote.processing.ProcessingActor.startProcessing(ProcessingActor.scala:23)
at com.learn.remote.processing.ProcessingActor$$anonfun$receive.applyOrElse(ProcessingActor.scala:18)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at com.learn.remote.processing.ProcessingActor.aroundReceive(ProcessingActor.scala:7)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[INFO] [07/24/2015 11:57:50.533] [ConnectorSystem-akka.actor.default-dispatcher-3] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] notifying failure to server
[ERROR] [07/24/2015 11:57:50.534] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] Exception received => not implemented by choice
[ERROR] [07/24/2015 11:57:50.534] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ProcessingSystem@127.0.0.1:2552/remote/akka.tcp/ConnectorSystem@127.0.0.1:2554/user/connectorActor/processingActor] not implemented by choice
java.lang.IllegalArgumentException: not implemented by choice
at com.learn.remote.processing.ProcessingActor.startProcessing(ProcessingActor.scala:23)
at com.learn.remote.processing.ProcessingActor$$anonfun$receive.applyOrElse(ProcessingActor.scala:18)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at com.learn.remote.processing.ProcessingActor.aroundReceive(ProcessingActor.scala:7)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[INFO] [07/24/2015 11:57:50.538] [ConnectorSystem-akka.actor.default-dispatcher-3] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] notifying failure to server
[ERROR] [07/24/2015 11:57:50.540] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] Exception received => not implemented by choice
[ERROR] [07/24/2015 11:57:50.540] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ProcessingSystem@127.0.0.1:2552/remote/akka.tcp/ConnectorSystem@127.0.0.1:2554/user/connectorActor/processingActor] not implemented by choice
java.lang.IllegalArgumentException: not implemented by choice
at com.learn.remote.processing.ProcessingActor.startProcessing(ProcessingActor.scala:23)
at com.learn.remote.processing.ProcessingActor$$anonfun$receive.applyOrElse(ProcessingActor.scala:18)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at com.learn.remote.processing.ProcessingActor.aroundReceive(ProcessingActor.scala:7)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[INFO] [07/24/2015 11:57:50.545] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] notifying failure to server
我怎样才能实现这种行为?
正如@rkuhn 在 gitter chat 上的推荐,以下对我有用
override def preRestart(reason: Throwable, message: Option[Any]): Unit = ()
所有代码?
override def aroundPostRestart(reason: Throwable): Unit = self.tell("Start", context.parent)
override def preRestart(reason: Throwable, message: Option[Any]): Unit = ()
override def postStop(): Unit = context.parent ! "ProcessingStopped"
daydreamer 的答案会起作用,但另一种方法可能是从父级观看子演员,当您收到终止消息时,执行 notifyFailure
var remoteProcessor:ActorRef = _
def receive = LoggingReceive {
case StartRemoteProcessor =>
remoteProcessor = context.actorOf(Props[ProcessingActor], "processingActor")
context.watch(remoteProcessor)
log.info("Starting Remote Processor")
remoteProcessor ! "Start"
case Terminated(remoteProcessor) =>
notifyFailure()
}
这样您就不需要自定义 Actor 生命周期方法,我发现这可能是错误的丰富来源。
我的Parent演员长得像
case object StartRemoteProcessor
class ConnectorActor extends Actor with ActorLogging {
override def supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 20 seconds) {
case e: OutOfMemoryError =>
log.error("Exception received => " + e.getMessage)
Restart
case e: IllegalArgumentException =>
log.error("Exception received => " + e.getMessage)
Restart
}
def receive = LoggingReceive {
case StartRemoteProcessor =>
val remoteProcessor = context.actorOf(Props[ProcessingActor], "processingActor")
log.info("Starting Remote Processor")
remoteProcessor ! "Start"
case "ProcessingStopped" =>
notifyFailure()
}
def notifyFailure() = {
log.info("notifying failure to server")
}
}
根据docs
The child actor is stopped if the limit is exceeded.
要求
- 一旦
maxNrOfRetries
耗尽,我想在 actor 最终停止时执行自定义操作notifyFailure
。这将发送电子邮件
在我的 Child Actor
我有
class ProcessingActor extends Actor with ActorLogging {
override def aroundPostRestart(reason: Throwable): Unit = self.tell("Start", context.parent)
override def preStart(): Unit = ()
override def postStop(): Unit = context.parent ! "ProcessingStopped"
def receive = LoggingReceive {
case "Start" =>
log.info("ProcessingActor path => " + self.path)
startProcessing()
}
def startProcessing() = {
println("executing startProcessing")
throw new IllegalArgumentException("not implemented by choice")
}
}
但是在日志中,我看到每个 Restart
notifyFailure
[INFO] [07/24/2015 11:57:50.107] [main] [Remoting] Starting remoting
[INFO] [07/24/2015 11:57:50.265] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://ConnectorSystem@127.0.0.1:2554]
[INFO] [07/24/2015 11:57:50.266] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://ConnectorSystem@127.0.0.1:2554]
ConnectorSystem Started
[INFO] [07/24/2015 11:57:50.277] [ConnectorSystem-akka.actor.default-dispatcher-3] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] Starting Remote Processor
[ERROR] [07/24/2015 11:57:50.509] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] Exception received => not implemented by choice
[ERROR] [07/24/2015 11:57:50.511] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ProcessingSystem@127.0.0.1:2552/remote/akka.tcp/ConnectorSystem@127.0.0.1:2554/user/connectorActor/processingActor] not implemented by choice
java.lang.IllegalArgumentException: not implemented by choice
at com.learn.remote.processing.ProcessingActor.startProcessing(ProcessingActor.scala:23)
at com.learn.remote.processing.ProcessingActor$$anonfun$receive.applyOrElse(ProcessingActor.scala:18)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at com.learn.remote.processing.ProcessingActor.aroundReceive(ProcessingActor.scala:7)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[INFO] [07/24/2015 11:57:50.526] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] notifying failure to server
[ERROR] [07/24/2015 11:57:50.528] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] Exception received => not implemented by choice
[ERROR] [07/24/2015 11:57:50.528] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ProcessingSystem@127.0.0.1:2552/remote/akka.tcp/ConnectorSystem@127.0.0.1:2554/user/connectorActor/processingActor] not implemented by choice
java.lang.IllegalArgumentException: not implemented by choice
at com.learn.remote.processing.ProcessingActor.startProcessing(ProcessingActor.scala:23)
at com.learn.remote.processing.ProcessingActor$$anonfun$receive.applyOrElse(ProcessingActor.scala:18)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at com.learn.remote.processing.ProcessingActor.aroundReceive(ProcessingActor.scala:7)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[INFO] [07/24/2015 11:57:50.533] [ConnectorSystem-akka.actor.default-dispatcher-3] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] notifying failure to server
[ERROR] [07/24/2015 11:57:50.534] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] Exception received => not implemented by choice
[ERROR] [07/24/2015 11:57:50.534] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ProcessingSystem@127.0.0.1:2552/remote/akka.tcp/ConnectorSystem@127.0.0.1:2554/user/connectorActor/processingActor] not implemented by choice
java.lang.IllegalArgumentException: not implemented by choice
at com.learn.remote.processing.ProcessingActor.startProcessing(ProcessingActor.scala:23)
at com.learn.remote.processing.ProcessingActor$$anonfun$receive.applyOrElse(ProcessingActor.scala:18)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at com.learn.remote.processing.ProcessingActor.aroundReceive(ProcessingActor.scala:7)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[INFO] [07/24/2015 11:57:50.538] [ConnectorSystem-akka.actor.default-dispatcher-3] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] notifying failure to server
[ERROR] [07/24/2015 11:57:50.540] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] Exception received => not implemented by choice
[ERROR] [07/24/2015 11:57:50.540] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ProcessingSystem@127.0.0.1:2552/remote/akka.tcp/ConnectorSystem@127.0.0.1:2554/user/connectorActor/processingActor] not implemented by choice
java.lang.IllegalArgumentException: not implemented by choice
at com.learn.remote.processing.ProcessingActor.startProcessing(ProcessingActor.scala:23)
at com.learn.remote.processing.ProcessingActor$$anonfun$receive.applyOrElse(ProcessingActor.scala:18)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at com.learn.remote.processing.ProcessingActor.aroundReceive(ProcessingActor.scala:7)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[INFO] [07/24/2015 11:57:50.545] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] notifying failure to server
我怎样才能实现这种行为?
正如@rkuhn 在 gitter chat 上的推荐,以下对我有用
override def preRestart(reason: Throwable, message: Option[Any]): Unit = ()
所有代码?
override def aroundPostRestart(reason: Throwable): Unit = self.tell("Start", context.parent)
override def preRestart(reason: Throwable, message: Option[Any]): Unit = ()
override def postStop(): Unit = context.parent ! "ProcessingStopped"
daydreamer 的答案会起作用,但另一种方法可能是从父级观看子演员,当您收到终止消息时,执行 notifyFailure
var remoteProcessor:ActorRef = _
def receive = LoggingReceive {
case StartRemoteProcessor =>
remoteProcessor = context.actorOf(Props[ProcessingActor], "processingActor")
context.watch(remoteProcessor)
log.info("Starting Remote Processor")
remoteProcessor ! "Start"
case Terminated(remoteProcessor) =>
notifyFailure()
}
这样您就不需要自定义 Actor 生命周期方法,我发现这可能是错误的丰富来源。