如何在 Akka Persistence 中获取导致重启的消息
How to get message causing restart in Akka Persistence
我有 PersistenceActor
,我想根据导致重新启动的消息在 preRestart
方法中做一些事情。对于普通演员来说这很容易,因为有消息传递给 preRestart
方法:
def preRestart(reason: Throwable, message: Option[Any])
但是对于 PersistentActor
不能以这种方式完成,因为每次 None
都作为消息属性传递。是Eventsourced.scala
:
那段代码引起的
override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = {
try {
internalStash.unstashAll()
unstashAll(unstashFilterPredicate)
} finally {
message match {
case Some(WriteMessageSuccess(m, _)) ⇒
flushJournalBatch()
super.aroundPreRestart(reason, Some(m))
case Some(LoopMessageSuccess(m, _)) ⇒
flushJournalBatch()
super.aroundPreRestart(reason, Some(m))
case Some(ReplayedMessage(m)) ⇒
flushJournalBatch()
super.aroundPreRestart(reason, Some(m))
case mo ⇒
flushJournalBatch()
super.aroundPreRestart(reason, None)
}
}
}
有谁知道为什么 None
被传递到那里,丢失了原始消息?
这似乎是一个疏忽,我们应该在 Akka 中修复它,我已经打开了一个跟踪修复它的工单:https://github.com/akka/akka/issues/21824
我有 PersistenceActor
,我想根据导致重新启动的消息在 preRestart
方法中做一些事情。对于普通演员来说这很容易,因为有消息传递给 preRestart
方法:
def preRestart(reason: Throwable, message: Option[Any])
但是对于 PersistentActor
不能以这种方式完成,因为每次 None
都作为消息属性传递。是Eventsourced.scala
:
override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = {
try {
internalStash.unstashAll()
unstashAll(unstashFilterPredicate)
} finally {
message match {
case Some(WriteMessageSuccess(m, _)) ⇒
flushJournalBatch()
super.aroundPreRestart(reason, Some(m))
case Some(LoopMessageSuccess(m, _)) ⇒
flushJournalBatch()
super.aroundPreRestart(reason, Some(m))
case Some(ReplayedMessage(m)) ⇒
flushJournalBatch()
super.aroundPreRestart(reason, Some(m))
case mo ⇒
flushJournalBatch()
super.aroundPreRestart(reason, None)
}
}
}
有谁知道为什么 None
被传递到那里,丢失了原始消息?
这似乎是一个疏忽,我们应该在 Akka 中修复它,我已经打开了一个跟踪修复它的工单:https://github.com/akka/akka/issues/21824