Akka 持久性查询和 actor 分片

Akka Persistence Query and actor sharding

我正在做 CQRS Akka actor 应用程序的查询端。

查询参与者设置为集群分片,并填充来自一个持久性查询流的事件。

我的问题是:

  1. 如果集群碎片中的参与者之一重新启动如何恢复它?

    • 关闭整个集群分片并回复所有事件?
    • 使集群碎片中的参与者成为持久性参与者并仅保存查询端的新事件集?
  2. 如果填充了Persistence Query的actor重启了,如何取消当前PQ再启动?

如前所述,我将评估在数据库中持久化您的查询端。

如果这不是一个选项,并且您想坚持每个分片的单一持久性查询,请在您的查询参与者中执行以下操作:

var inRecovery: Boolean = true;

override def preStart( ) = {
    //Subscribe to your event live stream now, so you don't miss anything during recovery
    // e.g. send Subscription message to your persistence query actor

    //Re-Read everything up to now for recovery
    readJournal.currentEventsByPersistenceId("persistenceId")
        .watchTermination()((_, f) => f pipeTo self) // Send Done to self after recovery is finished
        .map(Replay.apply) // Mark your replay messages
        .runWith( Sink.actorRef( self, tag ) ) // Send all replay events to self
}

override def receive = {
    case Done => // Recovery is finished
        inRecovery = false
        unstashAll() // unstash all normal messages received during recovery

    case Replay( payload ) =>
        //handle replayed messages

    case events: Event =>
        //handle normal events from your persistence query
        inRecovery match {
            case true => stash() // stash normal messages until recovery is done
            case false => 
                // recovery is done, start handling normal events
        }
}


case class Replay( payload: AnyRef )

所以基本上在 actor 开始之前订阅持久性查询 actor 并使用所有过去事件的有限流恢复状态,在所有事件通过后终止。在恢复期间隐藏所有传入事件,这些事件不是重播事件。然后在恢复完成后,unstash 一切并开始处理正常消息。