在 Akka 中使用重新启动的 actor 进行消息持久化和回放
Message Persistence and Playback with restarted actors in Akka
阿卡 Java 这里。我有两个演员,Parent
和 Child
,前者是后者的父级。如果 Child
抛出一个特定的异常(比如 UnrulyTeenagerExcepton
),那么我正在寻找的行为如下:
Parent
保存了对抛出异常时 Child
正在处理的消息的引用;然后
Child
重新启动,持久化消息“回放”到 Child
;但是
- 如果这个保存 -> 重新启动 -> 重放循环发生三次,并且
Child
抛出 UnrulyTeenagerException
三次,那么 我们 SupervisorStrategy.escalate()
迄今为止我最好的尝试:
// Groovy pseudo-code
class ChildFailureDecider extends Function<Throwable,Directive> {
int maxRetries = 3
int numRetries = 0
@Override
Directive apply(Throwable childFailure) {
if(childFailure instanceof UnrulyTeenagerException) {
numRetries++
if(numRetries <= maxRetries) {
// TODO: #1 How to persist the message that caused the ‘childFailure’?
return SupervisorStrategy.restart()
// TODO: #2 How to ‘play back’ the persisted message to Child?
}
}
SupervisorStrategy.escalate()
}
}
但如您所见,我在消息持久化和回放方面遇到了困难。有任何想法吗? Java 代码示例非常感谢,Akka 足够强大,无需学习 Scala 象形文字!
Akka 持久化是关于以持久的方式(例如到磁盘或数据库)以持久的方式记录事件(与消息不同),这样如果您的整个应用程序终止(例如 JVM 崩溃)或硬件故障),可以在重启时重建该 actor 的状态。在你的情况下,你想记住发送给单个演员的消息并在该演员因故障重新启动时重新发送它,所以我认为在这种情况下你不需要持久性API。
当一个参与者抛出一个异常时,该异常会被提交给监督者,但导致它的消息不会。我认为没有内置的方法可以实现这一点。监管者可以管理只重启3次的规则,通过设置合适的监管策略参数:http://doc.akka.io/japi/akka/2.4-M3/akka/actor/OneForOneStrategy.html#OneForOneStrategy-int-scala.concurrent.duration.Duration-akka.japi.Function-
消息的重放需要发件人处理。您可以通过让接收方在处理完消息后向发送方发送确认来实现至少一次语义,如果未收到确认,则让发送方定期重试。有关详细信息,请参阅此问题:
抱歉缺少代码,但我使用 Scala API 而不是 Java。
阿卡 Java 这里。我有两个演员,Parent
和 Child
,前者是后者的父级。如果 Child
抛出一个特定的异常(比如 UnrulyTeenagerExcepton
),那么我正在寻找的行为如下:
Parent
保存了对抛出异常时Child
正在处理的消息的引用;然后Child
重新启动,持久化消息“回放”到Child
;但是- 如果这个保存 -> 重新启动 -> 重放循环发生三次,并且
Child
抛出UnrulyTeenagerException
三次,那么 我们SupervisorStrategy.escalate()
迄今为止我最好的尝试:
// Groovy pseudo-code
class ChildFailureDecider extends Function<Throwable,Directive> {
int maxRetries = 3
int numRetries = 0
@Override
Directive apply(Throwable childFailure) {
if(childFailure instanceof UnrulyTeenagerException) {
numRetries++
if(numRetries <= maxRetries) {
// TODO: #1 How to persist the message that caused the ‘childFailure’?
return SupervisorStrategy.restart()
// TODO: #2 How to ‘play back’ the persisted message to Child?
}
}
SupervisorStrategy.escalate()
}
}
但如您所见,我在消息持久化和回放方面遇到了困难。有任何想法吗? Java 代码示例非常感谢,Akka 足够强大,无需学习 Scala 象形文字!
Akka 持久化是关于以持久的方式(例如到磁盘或数据库)以持久的方式记录事件(与消息不同),这样如果您的整个应用程序终止(例如 JVM 崩溃)或硬件故障),可以在重启时重建该 actor 的状态。在你的情况下,你想记住发送给单个演员的消息并在该演员因故障重新启动时重新发送它,所以我认为在这种情况下你不需要持久性API。
当一个参与者抛出一个异常时,该异常会被提交给监督者,但导致它的消息不会。我认为没有内置的方法可以实现这一点。监管者可以管理只重启3次的规则,通过设置合适的监管策略参数:http://doc.akka.io/japi/akka/2.4-M3/akka/actor/OneForOneStrategy.html#OneForOneStrategy-int-scala.concurrent.duration.Duration-akka.japi.Function-
消息的重放需要发件人处理。您可以通过让接收方在处理完消息后向发送方发送确认来实现至少一次语义,如果未收到确认,则让发送方定期重试。有关详细信息,请参阅此问题:
抱歉缺少代码,但我使用 Scala API 而不是 Java。