从 Supervisor 重启后向 actor 发送消息
Send message to actor after restart from Supervisor
我正在使用 BackoffSupervisor 策略来创建一个必须处理某些消息的 child actor。我想实现一个非常简单的重启策略,在异常情况下:
- Child 将失败消息传播给主管
主管重新启动child并再次发送失败的消息。
主管重试3次后放弃
- Akka 持久性不是一个选项
到目前为止我得到的是:
主管定义:
val childProps = Props(new SenderActor())
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
childProps,
childName = cmd.hashCode.toString,
minBackoff = 1.seconds,
maxBackoff = 2.seconds,
randomFactor = 0.2
)
.withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
case msg: MessageException => {
println("caught specific message!")
SupervisorStrategy.Restart
}
case _: Exception => SupervisorStrategy.Restart
case _ ⇒ SupervisorStrategy.Escalate
})
)
val sup = context.actorOf(supervisor)
sup ! cmd
Child 本应发送 e-mail 但失败(抛出一些异常)并将异常传播回主管的参与者:
class SenderActor() extends Actor {
def fakeSendMail():Unit = {
Thread.sleep(1000)
throw new Exception("surprising exception")
}
override def receive: Receive = {
case cmd: NewMail =>
println("new mail received routee")
try {
fakeSendMail()
} catch {
case t => throw MessageException(cmd, t)
}
}
}
在上面的代码中,我将任何异常包装到自定义的 class MessageException 中,它会传播到 SupervisorStrategy,但是如何将它进一步传播到新的 child 以强制重新处理?这是正确的方法吗?
编辑。我试图在 preRestart
挂钩上向 Actor 重新发送消息,但不知何故没有触发挂钩:
class SenderActor() extends Actor {
def fakeSendMail():Unit = {
Thread.sleep(1000)
// println("mail sent!")
throw new Exception("surprising exception")
}
override def preStart(): Unit = {
println("child starting")
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
reason match {
case m: MessageException => {
println("aaaaa")
message.foreach(self ! _)
}
case _ => println("bbbb")
}
}
override def postStop(): Unit = {
println("child stopping")
}
override def receive: Receive = {
case cmd: NewMail =>
println("new mail received routee")
try {
fakeSendMail()
} catch {
case t => throw MessageException(cmd, t)
}
}
}
这给了我类似于以下输出的内容:
new mail received routee
caught specific message!
child stopping
[ERROR] [01/26/2018 10:15:35.690]
[example-akka.actor.default-dispatcher-2]
[akka://example/user/persistentActor-4-scala/$a/1962829645] Could not
process message sample.persistence.MessageException:
Could not process message <stacktrace>
child starting
但是没有来自 preRestart
钩子
的日志
失败的子演员在您的主管策略中可用作发件人。引用 https://doc.akka.io/docs/akka/current/fault-tolerance.html#creating-a-supervisor-strategy:
If the strategy is declared inside the supervising actor (as opposed
to within a companion object) its decider has access to all internal
state of the actor in a thread-safe fashion, including obtaining a
reference to the currently failed child (available as the sender of
the failure message).
在您的情况下,使用某些第三方软件发送电子邮件是一项危险的操作。为什么不应用 Circuit Breaker 模式并完全跳过发送者角色?此外,您仍然可以在其中放置一个 actor(带有一些 Backoff Supervisor)和 Circuit Breaker(如果这对您有意义的话)。
之所以没有调用child的preRestart
钩子是因为Backoff.onFailure
使用BackoffOnRestartSupervisor
underneath the covers, which replaces the default restart behavior with a stop-and-delayed-start behavior that is consistent with the backoff policy. In other words, when using Backoff.onFailure
, when a child is restarted, the child's preRestart
method is not called because the underlying supervisor actually stops the child, then starts it again later. (Using Backoff.onStop
可以触发child的preRestart
hook,但这与当前的讨论无关。)
BackoffSupervisor
API 不支持在主管 child 重启时自动重新发送消息:您必须自己实现此行为。重试消息的一个想法是让 BackoffSupervisor
的主管处理它。例如:
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
...
).withReplyWhileStopped(ChildIsStopped)
).withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
case msg: MessageException =>
println("caught specific message!")
self ! Error(msg.cmd) // replace cmd with whatever the property name is
SupervisorStrategy.Restart
case ...
})
)
val sup = context.actorOf(supervisor)
def receive = {
case cmd: NewMail =>
sup ! cmd
case Error(cmd) =>
timers.startSingleTimer(cmd.id, Replay(cmd), 10.seconds)
// We assume that NewMail has an id field. Also, adjust the time as needed.
case Replay(cmd) =>
sup ! cmd
case ChildIsStopped =>
println("child is stopped")
}
在上面的代码中,嵌入在MessageException
中的NewMail
消息被包装在自定义大小写class中(为了方便区分它和"normal" /new NewMail
消息)并发送到 self
。在此上下文中,self
是创建 BackoffSupervisor
的参与者。然后,这个封闭的 actor 使用 single timer 在某个时候重播原始消息。这个时间点在未来应该足够远,这样 BackoffSupervisor
可能会耗尽 SenderActor
的重启尝试,这样 child 就有足够的机会进入 "good" 收到重发消息前的状态。显然,这个例子只涉及一条消息的重发,而不管 child 次重启。
另一个想法是为每个 NewMail
消息创建一个 BackoffSupervisor
-SenderActor
对,并让 SenderActor
将 NewMail
消息发送给自己在 preStart
挂钩中。这种方法的一个问题是清理资源;即,当处理成功或 child 重启耗尽时,关闭 BackoffSupervisors
(这将依次关闭它们各自的 SenderActor
children)。 NewMail
ids 到 (ActorRef, Int)
元组的映射(其中 ActorRef
是对 BackoffSupervisor
actor 的引用, Int
是重启尝试的次数) 在这种情况下会有帮助:
class Overlord extends Actor {
var state = Map[Long, (ActorRef, Int)]() // assuming the mail id is a Long
def receive = {
case cmd: NewMail =>
val childProps = Props(new SenderActor(cmd, self))
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
...
).withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
case msg: MessageException =>
println("caught specific message!")
self ! Error(msg.cmd)
SupervisorStrategy.Restart
case ...
})
)
val sup = context.actorOf(supervisor)
state += (cmd.id -> (sup, 0))
case ProcessingDone(cmdId) =>
state.get(cmdId) match {
case Some((backoffSup, _)) =>
context.stop(backoffSup)
state -= cmdId
case None =>
println(s"${cmdId} not found")
}
case Error(cmd) =>
val cmdId = cmd.id
state.get(cmdId) match {
case Some((backoffSup, numRetries)) =>
if (numRetries == 3) {
println(s"${cmdId} has already been retried 3 times. Giving up.")
context.stop(backoffSup)
state -= cmdId
} else
state += (cmdId -> (backoffSup, numRetries + 1))
case None =>
println(s"${cmdId} not found")
}
case ...
}
}
请注意,上例中的 SenderActor
将 NewMail
和 ActorRef
作为构造函数参数。后一个参数允许 SenderActor
发送自定义 ProcessingDone
消息给封闭的 actor:
class SenderActor(cmd: NewMail, target: ActorRef) extends Actor {
override def preStart(): Unit = {
println(s"child starting, sending ${cmd} to self")
self ! cmd
}
def fakeSendMail(): Unit = ...
def receive = {
case cmd: NewMail => ...
}
}
很明显,SenderActor
被设置为在 fakeSendMail
的当前实现中每次都会失败。我将在 SenderActor
中保留实施快乐路径所需的额外更改,其中 SenderActor
向 target
发送一条 ProcessingDone
消息给您。
在@chunjef 提供的好的解决方案中,他警告在退避主管启动工作程序之前安排作业重新发送的风险
This enclosing actor then uses a single timer to replay the original message at some point. This point in time should be far enough in the future such that the BackoffSupervisor can potentially exhaust SenderActor's restart attempts, so that the child can have ample opportunity to get in a "good" state before it receives the resent message.
如果发生这种情况,情况将是工作变成死信,并且不会有进一步的进展。
我用 this scenario.
做了一个简化的 fiddle
因此,计划延迟应大于 maxBackoff,这可能会影响作业完成时间。
避免这种情况的一种可能解决方案是让工人演员在准备工作时向他的父亲发送消息,例如 here.
我正在使用 BackoffSupervisor 策略来创建一个必须处理某些消息的 child actor。我想实现一个非常简单的重启策略,在异常情况下:
- Child 将失败消息传播给主管
主管重新启动child并再次发送失败的消息。
主管重试3次后放弃
- Akka 持久性不是一个选项
到目前为止我得到的是:
主管定义:
val childProps = Props(new SenderActor())
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
childProps,
childName = cmd.hashCode.toString,
minBackoff = 1.seconds,
maxBackoff = 2.seconds,
randomFactor = 0.2
)
.withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
case msg: MessageException => {
println("caught specific message!")
SupervisorStrategy.Restart
}
case _: Exception => SupervisorStrategy.Restart
case _ ⇒ SupervisorStrategy.Escalate
})
)
val sup = context.actorOf(supervisor)
sup ! cmd
Child 本应发送 e-mail 但失败(抛出一些异常)并将异常传播回主管的参与者:
class SenderActor() extends Actor {
def fakeSendMail():Unit = {
Thread.sleep(1000)
throw new Exception("surprising exception")
}
override def receive: Receive = {
case cmd: NewMail =>
println("new mail received routee")
try {
fakeSendMail()
} catch {
case t => throw MessageException(cmd, t)
}
}
}
在上面的代码中,我将任何异常包装到自定义的 class MessageException 中,它会传播到 SupervisorStrategy,但是如何将它进一步传播到新的 child 以强制重新处理?这是正确的方法吗?
编辑。我试图在 preRestart
挂钩上向 Actor 重新发送消息,但不知何故没有触发挂钩:
class SenderActor() extends Actor {
def fakeSendMail():Unit = {
Thread.sleep(1000)
// println("mail sent!")
throw new Exception("surprising exception")
}
override def preStart(): Unit = {
println("child starting")
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
reason match {
case m: MessageException => {
println("aaaaa")
message.foreach(self ! _)
}
case _ => println("bbbb")
}
}
override def postStop(): Unit = {
println("child stopping")
}
override def receive: Receive = {
case cmd: NewMail =>
println("new mail received routee")
try {
fakeSendMail()
} catch {
case t => throw MessageException(cmd, t)
}
}
}
这给了我类似于以下输出的内容:
new mail received routee
caught specific message!
child stopping
[ERROR] [01/26/2018 10:15:35.690]
[example-akka.actor.default-dispatcher-2]
[akka://example/user/persistentActor-4-scala/$a/1962829645] Could not
process message sample.persistence.MessageException:
Could not process message <stacktrace>
child starting
但是没有来自 preRestart
钩子
失败的子演员在您的主管策略中可用作发件人。引用 https://doc.akka.io/docs/akka/current/fault-tolerance.html#creating-a-supervisor-strategy:
If the strategy is declared inside the supervising actor (as opposed to within a companion object) its decider has access to all internal state of the actor in a thread-safe fashion, including obtaining a reference to the currently failed child (available as the sender of the failure message).
在您的情况下,使用某些第三方软件发送电子邮件是一项危险的操作。为什么不应用 Circuit Breaker 模式并完全跳过发送者角色?此外,您仍然可以在其中放置一个 actor(带有一些 Backoff Supervisor)和 Circuit Breaker(如果这对您有意义的话)。
之所以没有调用child的preRestart
钩子是因为Backoff.onFailure
使用BackoffOnRestartSupervisor
underneath the covers, which replaces the default restart behavior with a stop-and-delayed-start behavior that is consistent with the backoff policy. In other words, when using Backoff.onFailure
, when a child is restarted, the child's preRestart
method is not called because the underlying supervisor actually stops the child, then starts it again later. (Using Backoff.onStop
可以触发child的preRestart
hook,但这与当前的讨论无关。)
BackoffSupervisor
API 不支持在主管 child 重启时自动重新发送消息:您必须自己实现此行为。重试消息的一个想法是让 BackoffSupervisor
的主管处理它。例如:
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
...
).withReplyWhileStopped(ChildIsStopped)
).withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
case msg: MessageException =>
println("caught specific message!")
self ! Error(msg.cmd) // replace cmd with whatever the property name is
SupervisorStrategy.Restart
case ...
})
)
val sup = context.actorOf(supervisor)
def receive = {
case cmd: NewMail =>
sup ! cmd
case Error(cmd) =>
timers.startSingleTimer(cmd.id, Replay(cmd), 10.seconds)
// We assume that NewMail has an id field. Also, adjust the time as needed.
case Replay(cmd) =>
sup ! cmd
case ChildIsStopped =>
println("child is stopped")
}
在上面的代码中,嵌入在MessageException
中的NewMail
消息被包装在自定义大小写class中(为了方便区分它和"normal" /new NewMail
消息)并发送到 self
。在此上下文中,self
是创建 BackoffSupervisor
的参与者。然后,这个封闭的 actor 使用 single timer 在某个时候重播原始消息。这个时间点在未来应该足够远,这样 BackoffSupervisor
可能会耗尽 SenderActor
的重启尝试,这样 child 就有足够的机会进入 "good" 收到重发消息前的状态。显然,这个例子只涉及一条消息的重发,而不管 child 次重启。
另一个想法是为每个 NewMail
消息创建一个 BackoffSupervisor
-SenderActor
对,并让 SenderActor
将 NewMail
消息发送给自己在 preStart
挂钩中。这种方法的一个问题是清理资源;即,当处理成功或 child 重启耗尽时,关闭 BackoffSupervisors
(这将依次关闭它们各自的 SenderActor
children)。 NewMail
ids 到 (ActorRef, Int)
元组的映射(其中 ActorRef
是对 BackoffSupervisor
actor 的引用, Int
是重启尝试的次数) 在这种情况下会有帮助:
class Overlord extends Actor {
var state = Map[Long, (ActorRef, Int)]() // assuming the mail id is a Long
def receive = {
case cmd: NewMail =>
val childProps = Props(new SenderActor(cmd, self))
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
...
).withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
case msg: MessageException =>
println("caught specific message!")
self ! Error(msg.cmd)
SupervisorStrategy.Restart
case ...
})
)
val sup = context.actorOf(supervisor)
state += (cmd.id -> (sup, 0))
case ProcessingDone(cmdId) =>
state.get(cmdId) match {
case Some((backoffSup, _)) =>
context.stop(backoffSup)
state -= cmdId
case None =>
println(s"${cmdId} not found")
}
case Error(cmd) =>
val cmdId = cmd.id
state.get(cmdId) match {
case Some((backoffSup, numRetries)) =>
if (numRetries == 3) {
println(s"${cmdId} has already been retried 3 times. Giving up.")
context.stop(backoffSup)
state -= cmdId
} else
state += (cmdId -> (backoffSup, numRetries + 1))
case None =>
println(s"${cmdId} not found")
}
case ...
}
}
请注意,上例中的 SenderActor
将 NewMail
和 ActorRef
作为构造函数参数。后一个参数允许 SenderActor
发送自定义 ProcessingDone
消息给封闭的 actor:
class SenderActor(cmd: NewMail, target: ActorRef) extends Actor {
override def preStart(): Unit = {
println(s"child starting, sending ${cmd} to self")
self ! cmd
}
def fakeSendMail(): Unit = ...
def receive = {
case cmd: NewMail => ...
}
}
很明显,SenderActor
被设置为在 fakeSendMail
的当前实现中每次都会失败。我将在 SenderActor
中保留实施快乐路径所需的额外更改,其中 SenderActor
向 target
发送一条 ProcessingDone
消息给您。
在@chunjef 提供的好的解决方案中,他警告在退避主管启动工作程序之前安排作业重新发送的风险
This enclosing actor then uses a single timer to replay the original message at some point. This point in time should be far enough in the future such that the BackoffSupervisor can potentially exhaust SenderActor's restart attempts, so that the child can have ample opportunity to get in a "good" state before it receives the resent message.
如果发生这种情况,情况将是工作变成死信,并且不会有进一步的进展。 我用 this scenario.
做了一个简化的 fiddle因此,计划延迟应大于 maxBackoff,这可能会影响作业完成时间。 避免这种情况的一种可能解决方案是让工人演员在准备工作时向他的父亲发送消息,例如 here.