由 BackoffSupervisor 监督的 Actor 在重启后丢失隐藏的消息
Actor supervised by BackoffSupervisor loses stashed messages after restart
我有一个 stash 使用的演员。有时,当它崩溃时,它会丢失所有隐藏的消息。我发现这取决于我使用什么监督逻辑。
我写了一个简单的例子。
藏匿的演员:
case object WrongMessage
case object TestMessage
case object InitialMessage
class TestActor extends Actor with Stash {
override def receive: Receive = uninitializedReceive
def uninitializedReceive: Receive = {
case TestMessage =>
println(s"stash test message")
stash()
case WrongMessage =>
println(s"wrong message")
throw new Throwable("wrong message")
case InitialMessage =>
println(s"initial message")
context.become(initializedReceive)
unstashAll()
}
def initializedReceive: Receive = {
case TestMessage =>
println(s"test message")
}
}
在下面的代码中,TestActor
never 收到隐藏的 TestMessage
:
object Test1 extends App {
implicit val system: ActorSystem = ActorSystem()
val actorRef = system.actorOf(BackoffSupervisor.props(Backoff.onFailure(
Props[TestActor], "TestActor", 1 seconds, 1 seconds, 0
).withSupervisorStrategy(OneForOneStrategy()({
case _ => SupervisorStrategy.Restart
}))))
actorRef ! TestMessage
Thread.sleep(5000L)
actorRef ! WrongMessage
Thread.sleep(5000L)
actorRef ! InitialMessage
}
但是这段代码运行良好:
class SupervisionActor extends Actor {
val testActorRef: ActorRef = context.actorOf(Props[TestActor])
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy()({
case _ => SupervisorStrategy.Restart
})
override def receive: Receive = {
case message => testActorRef forward message
}
}
object Test2 extends App {
implicit val system: ActorSystem = ActorSystem()
val actorRef = system.actorOf(Props(classOf[SupervisionActor]))
actorRef ! TestMessage
Thread.sleep(5000L)
actorRef ! WrongMessage
Thread.sleep(5000L)
actorRef ! InitialMessage
}
我调查了资料,发现演员监督使用
LocalActorRef.restart method which backed by system dispatcher logic, but BackoffSupervisor
simply creates a new actor after termination of the old one。有什么办法可以解决这个问题吗?
我不确定是否可以在 BackoffSupervisor
下使 restart
正确发送隐藏消息而无需一些自定义重新实现工作。
正如您已经指出的,BackoffSupervisor
有自己的 restart
绕过标准 actor 生命周期。事实上,BackoffOnRestartSupervisor 源代码中明确指出:
Whatever the final Directive is, we will translate all Restarts to our
own Restarts, which involves stopping the child.
如果你还没有读过这篇文章reported issue, it has a relevant discussion re: problem with Backoff.onFailure。
Backoff.onStop 也会提供想要的 BackoffSupervisor 功能,但不幸的是它有自己的用例,不会被异常触发。
我有一个 stash 使用的演员。有时,当它崩溃时,它会丢失所有隐藏的消息。我发现这取决于我使用什么监督逻辑。
我写了一个简单的例子。
藏匿的演员:
case object WrongMessage
case object TestMessage
case object InitialMessage
class TestActor extends Actor with Stash {
override def receive: Receive = uninitializedReceive
def uninitializedReceive: Receive = {
case TestMessage =>
println(s"stash test message")
stash()
case WrongMessage =>
println(s"wrong message")
throw new Throwable("wrong message")
case InitialMessage =>
println(s"initial message")
context.become(initializedReceive)
unstashAll()
}
def initializedReceive: Receive = {
case TestMessage =>
println(s"test message")
}
}
在下面的代码中,TestActor
never 收到隐藏的 TestMessage
:
object Test1 extends App {
implicit val system: ActorSystem = ActorSystem()
val actorRef = system.actorOf(BackoffSupervisor.props(Backoff.onFailure(
Props[TestActor], "TestActor", 1 seconds, 1 seconds, 0
).withSupervisorStrategy(OneForOneStrategy()({
case _ => SupervisorStrategy.Restart
}))))
actorRef ! TestMessage
Thread.sleep(5000L)
actorRef ! WrongMessage
Thread.sleep(5000L)
actorRef ! InitialMessage
}
但是这段代码运行良好:
class SupervisionActor extends Actor {
val testActorRef: ActorRef = context.actorOf(Props[TestActor])
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy()({
case _ => SupervisorStrategy.Restart
})
override def receive: Receive = {
case message => testActorRef forward message
}
}
object Test2 extends App {
implicit val system: ActorSystem = ActorSystem()
val actorRef = system.actorOf(Props(classOf[SupervisionActor]))
actorRef ! TestMessage
Thread.sleep(5000L)
actorRef ! WrongMessage
Thread.sleep(5000L)
actorRef ! InitialMessage
}
我调查了资料,发现演员监督使用
LocalActorRef.restart method which backed by system dispatcher logic, but BackoffSupervisor
simply creates a new actor after termination of the old one。有什么办法可以解决这个问题吗?
我不确定是否可以在 BackoffSupervisor
下使 restart
正确发送隐藏消息而无需一些自定义重新实现工作。
正如您已经指出的,BackoffSupervisor
有自己的 restart
绕过标准 actor 生命周期。事实上,BackoffOnRestartSupervisor 源代码中明确指出:
Whatever the final Directive is, we will translate all Restarts to our own Restarts, which involves stopping the child.
如果你还没有读过这篇文章reported issue, it has a relevant discussion re: problem with Backoff.onFailure。
Backoff.onStop 也会提供想要的 BackoffSupervisor 功能,但不幸的是它有自己的用例,不会被异常触发。