Child actor 的调度程序未停止并在停止后抛出 NPE parent actor
Child actor's scheduler is not stopped and throws a NPE after stopping parent actor
流量:
- 我有 3 个级别的演员(
A
、B
和 C
)。
A
创建 B
,B
创建 C
。 A --> B --> C
B
和 C
有 2 个状态:working
和 ending
。在 C
完成所有工作后,它进入 ending
状态,在该状态下启动调度程序向 B
发送 CFinished
消息。
- 当
B
接收到CFinished
消息时也进入到它的ending
状态并且它启动它自己的调度器发送一个BFinished
消息到A
- 当
A
收到BFinished
消息时,也进入ending
状态并调用context stop self
,停止自身和所有children他已创建。
问题:
看起来调度器在 actor 停止后并没有停止,下次它们被解雇时,它们会抛出一个 NullPointerException
。
我该如何解决这个问题?
代码:
import scala.concurrent.duration._
import akka.actor.{Actor, ActorLogging, Cancellable, Props}
import com.zinio.damntool.msg.{BFinished, CFinished}
object msg {
case class BFinished()
case class CFinished()
}
object A {
def props(): Props = Props(new A())
}
class A() extends Actor with ActorLogging {
implicit val ec = context.dispatcher
context actorOf B.props
def receive = working
def working: Receive = {
case BFinished =>
log.warning("[Actor B] -> Received: BFinished")
context stop self
}
}
object B {
def props(): Props = Props(new B())
}
class B() extends Actor with ActorLogging {
implicit val ec = context.dispatcher
var cancellable: Cancellable = _
context actorOf C.props
def receive = working
def working: Receive = {
case CFinished =>
log.warning("[Actor B] -> Received: CFinished")
cancellable = scheduleEndActor
context become ending
}
def ending: Receive = {
case _ =>
}
def scheduleEndActor = context.system.scheduler.schedule(0.seconds, 5.seconds)(endActor())
def endActor() = context.parent ! BFinished
}
object C {
def props(): Props = Props(new C())
}
class C() extends Actor with ActorLogging {
implicit val ec = context.dispatcher
var cancellable: Cancellable = _
self ! CFinished
def receive = working
def working: Receive = {
case CFinished =>
log.warning("[Actor C] -> Received: CFinished")
cancellable = scheduleEndActor
context become ending
}
def ending: Receive = {
case _ =>
}
def scheduleEndActor = context.system.scheduler.schedule(0.seconds, 5.seconds)(endActor())
def endActor() = context.parent ! CFinished
}
错误:
[WARN] [10/20/2016 00:05:59.700] [test-system-akka.actor.default-dispatcher-4] [akka://test-system/user/$a/$a/$a] [Actor C] -> Received: CFinished
[WARN] [10/20/2016 00:05:59.706] [test-system-akka.actor.default-dispatcher-2] [akka://test-system/user/$a/$a] [Actor B] -> Received: CFinished
[WARN] [10/20/2016 00:05:59.709] [test-system-akka.actor.default-dispatcher-3] [akka://test-system/user/$a] [Actor B] -> Received: BFinished
[ERROR] [10/20/2016 00:06:04.714] [test-system-akka.actor.default-dispatcher-8] [TaskInvocation] null
java.lang.NullPointerException
at com.test.B.endActor(Test.scala:62)
at com.test.B$$anonfun$scheduleEndActor.apply$mcV$sp(Test.scala:60)
at akka.actor.Scheduler$$anon.run(Scheduler.scala:78)
at akka.actor.LightArrayRevolverScheduler$$anon$$anon.run(LightArrayRevolverScheduler.scala:104)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:409)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[ERROR] [10/20/2016 00:06:04.714] [test-system-akka.actor.default-dispatcher-2] [TaskInvocation] null
java.lang.NullPointerException
at com.test.C.endActor(Test.scala:90)
at com.test.C$$anonfun$scheduleEndActor.apply$mcV$sp(Test.scala:88)
at akka.actor.Scheduler$$anon.run(Scheduler.scala:78)
at akka.actor.LightArrayRevolverScheduler$$anon$$anon.run(LightArrayRevolverScheduler.scala:104)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:409)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
我刚刚意识到我可以阻止调度程序覆盖 postStop
方法。这样,就不会抛出 NullPointerException
。
override def postStop(): Unit = cancellable.cancel
第一:如果初始延迟为零,为什么需要调度程序?
第二:如果你真的需要它,那么你可以使用context.system.scheduler.scheduleOnce(x.seconds, context.parent, BFinished )
。这样一来,您根本不需要使用 cancellable.cancel
。
它会引发 NPE,因为您正在从另一个线程访问 context.parent
。
使用
def scheduleEndActor = context.system.scheduler.schedule(0.seconds, 5.seconds, context.parent, BFinished)
流量:
- 我有 3 个级别的演员(
A
、B
和C
)。 A
创建B
,B
创建C
。A --> B --> C
B
和C
有 2 个状态:working
和ending
。在C
完成所有工作后,它进入ending
状态,在该状态下启动调度程序向B
发送CFinished
消息。- 当
B
接收到CFinished
消息时也进入到它的ending
状态并且它启动它自己的调度器发送一个BFinished
消息到A
- 当
A
收到BFinished
消息时,也进入ending
状态并调用context stop self
,停止自身和所有children他已创建。
问题:
看起来调度器在 actor 停止后并没有停止,下次它们被解雇时,它们会抛出一个 NullPointerException
。
我该如何解决这个问题?
代码:
import scala.concurrent.duration._
import akka.actor.{Actor, ActorLogging, Cancellable, Props}
import com.zinio.damntool.msg.{BFinished, CFinished}
object msg {
case class BFinished()
case class CFinished()
}
object A {
def props(): Props = Props(new A())
}
class A() extends Actor with ActorLogging {
implicit val ec = context.dispatcher
context actorOf B.props
def receive = working
def working: Receive = {
case BFinished =>
log.warning("[Actor B] -> Received: BFinished")
context stop self
}
}
object B {
def props(): Props = Props(new B())
}
class B() extends Actor with ActorLogging {
implicit val ec = context.dispatcher
var cancellable: Cancellable = _
context actorOf C.props
def receive = working
def working: Receive = {
case CFinished =>
log.warning("[Actor B] -> Received: CFinished")
cancellable = scheduleEndActor
context become ending
}
def ending: Receive = {
case _ =>
}
def scheduleEndActor = context.system.scheduler.schedule(0.seconds, 5.seconds)(endActor())
def endActor() = context.parent ! BFinished
}
object C {
def props(): Props = Props(new C())
}
class C() extends Actor with ActorLogging {
implicit val ec = context.dispatcher
var cancellable: Cancellable = _
self ! CFinished
def receive = working
def working: Receive = {
case CFinished =>
log.warning("[Actor C] -> Received: CFinished")
cancellable = scheduleEndActor
context become ending
}
def ending: Receive = {
case _ =>
}
def scheduleEndActor = context.system.scheduler.schedule(0.seconds, 5.seconds)(endActor())
def endActor() = context.parent ! CFinished
}
错误:
[WARN] [10/20/2016 00:05:59.700] [test-system-akka.actor.default-dispatcher-4] [akka://test-system/user/$a/$a/$a] [Actor C] -> Received: CFinished
[WARN] [10/20/2016 00:05:59.706] [test-system-akka.actor.default-dispatcher-2] [akka://test-system/user/$a/$a] [Actor B] -> Received: CFinished
[WARN] [10/20/2016 00:05:59.709] [test-system-akka.actor.default-dispatcher-3] [akka://test-system/user/$a] [Actor B] -> Received: BFinished
[ERROR] [10/20/2016 00:06:04.714] [test-system-akka.actor.default-dispatcher-8] [TaskInvocation] null
java.lang.NullPointerException
at com.test.B.endActor(Test.scala:62)
at com.test.B$$anonfun$scheduleEndActor.apply$mcV$sp(Test.scala:60)
at akka.actor.Scheduler$$anon.run(Scheduler.scala:78)
at akka.actor.LightArrayRevolverScheduler$$anon$$anon.run(LightArrayRevolverScheduler.scala:104)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:409)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[ERROR] [10/20/2016 00:06:04.714] [test-system-akka.actor.default-dispatcher-2] [TaskInvocation] null
java.lang.NullPointerException
at com.test.C.endActor(Test.scala:90)
at com.test.C$$anonfun$scheduleEndActor.apply$mcV$sp(Test.scala:88)
at akka.actor.Scheduler$$anon.run(Scheduler.scala:78)
at akka.actor.LightArrayRevolverScheduler$$anon$$anon.run(LightArrayRevolverScheduler.scala:104)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:409)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
我刚刚意识到我可以阻止调度程序覆盖 postStop
方法。这样,就不会抛出 NullPointerException
。
override def postStop(): Unit = cancellable.cancel
第一:如果初始延迟为零,为什么需要调度程序?
第二:如果你真的需要它,那么你可以使用context.system.scheduler.scheduleOnce(x.seconds, context.parent, BFinished )
。这样一来,您根本不需要使用 cancellable.cancel
。
它会引发 NPE,因为您正在从另一个线程访问 context.parent
。
使用
def scheduleEndActor = context.system.scheduler.schedule(0.seconds, 5.seconds, context.parent, BFinished)