Akka 持久化查询事件流和 CQRS

Akka Persistence Query event stream and CQRS

我正在尝试在我的 ES-CQRS 架构中实现读取端。比方说我有这样一个坚持不懈的演员:

object UserWrite {

  sealed trait UserEvent
  sealed trait State
  case object Uninitialized extends State
  case class User(username: String, password: String) extends State
  case class AddUser(user: User)
  case class UserAdded(user: User) extends UserEvent
  case class UserEvents(userEvents: Source[(Long, UserEvent), NotUsed])
  case class UsersStream(fromSeqNo: Long)
  case object GetCurrentUser

  def props = Props(new UserWrite)
}

class UserWrite extends PersistentActor {

  import UserWrite._

  private var currentUser: State = Uninitialized

  override def persistenceId: String = "user-write"

  override def receiveRecover: Receive = {
    case UserAdded(user) => currentUser = user
  }

  override def receiveCommand: Receive = {
    case AddUser(user: User) => persist(UserAdded(user)) {
      case UserAdded(`user`) => currentUser = user
    }
    case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo)
    case GetCurrentUser => sender() ! currentUser
  }

  def publishUserEvents(fromSeqNo: Long) = {
    val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
    val userEvents = readJournal
      .eventsByPersistenceId("user-write", fromSeqNo, Long.MaxValue)
      .map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event }
    sender() ! UserEvents(userEvents)
  }
}

据我了解,每次事件持久化时,我们都可以通过 Akka Persistence Query 发布它。现在,我不确定订阅这些事件的正确方法是什么,以便我可以将其保存在我的读取端数据库中?其中一个想法是最初将 UsersStream 消息从我的阅读方演员发送到 UserWrite 演员和该阅读演员中的 "sink" 事件。

编辑

根据@cmbaxter 的建议,我以这种方式实现了读取端:

object UserRead {

  case object GetUsers
  case class GetUserByUsername(username: String)
  case class LastProcessedEventOffset(seqNo: Long)
  case object StreamCompleted

  def props = Props(new UserRead)
}

class UserRead extends PersistentActor {
  import UserRead._

  var inMemoryUsers = Set.empty[User]
  var offset        = 0L

  override val persistenceId: String = "user-read"

  override def receiveRecover: Receive = {
    // Recovery from snapshot will always give us last sequence number
    case SnapshotOffer(_, LastProcessedEventOffset(seqNo)) => offset = seqNo
    case RecoveryCompleted                                 => recoveryCompleted()
  }

  // After recovery is being completed, events will be projected to UserRead actor
  def recoveryCompleted(): Unit = {
    implicit val materializer = ActorMaterializer()
    PersistenceQuery(context.system)
      .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
      .eventsByPersistenceId("user-write", offset + 1, Long.MaxValue)
      .map {
        case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event
      }
      .runWith(Sink.actorRef(self, StreamCompleted))
  }

  override def receiveCommand: Receive = {
    case GetUsers                    => sender() ! inMemoryUsers
    case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username)
    // Match projected event and update offset
    case (seqNo: Long, UserAdded(user)) =>
      saveSnapshot(LastProcessedEventOffset(seqNo))
      inMemoryUsers += user
  }
}

存在一些问题,例如:事件流似乎很慢。 IE。 UserRead 参与者可以在保存新添加的用户之前用一组用户回答。

编辑 2

我增加了 cassandra 查询日志的刷新间隔,这更多地解决了慢速事件流的问题。似乎 Cassandra 事件日志是默认的,每 3 秒轮询一次。在我的 application.conf 中,我添加了:

cassandra-query-journal {
  refresh-interval = 20ms
}

编辑 3

实际上,不要减少刷新间隔。这会增加内存使用量,但这并不危险,也不是重点。 CQRS 的一般概念是写入和读取端是异步的。因此,写入数据后永远无法立即读取。处理 UI?我只是打开流并在读取端确认后通过服务器发送的事件推送数据。

有一些方法可以做到这一点。例如,在我的应用程序中,我的查询端有一个 actor,它有一个持续寻找更改的 PersistenceQuery,但您也可以有一个具有相同查询的线程。事情是保持流打开以便能够在事件发生后立即读取持久化事件

val readJournal =
PersistenceQuery(system).readJournalFor[CassandraReadJournal](
  CassandraReadJournal.Identifier)

// issue query to journal
val source: Source[EventEnvelope, NotUsed] =
  readJournal.eventsByPersistenceId(s"MyActorId", 0, Long.MaxValue)

// materialize stream, consuming events
implicit val mat = ActorMaterializer()
source.map(_.event).runForeach{
  case userEvent: UserEvent => {
    doSomething(userEvent)
  }
}

除此之外,您可以使用一个计时器来引发 PersistenceQuery 并存储新事件,但我认为打开流是最好的方法

虽然仅使用 PersistenceQuery 的解决方案获得通过,但它存在以下问题:

  1. 是部分的,只提供了读取EventEnvelopes的方法。
  2. 它不能与状态快照一起使用,因此,CQRS Reader 部分应该结束了 所有持续存在的事件一直存在。

第一个解决方案更好,但存在以下问题:

  1. 太复杂了。它会导致用户不必要地处理序列号。
  2. 代码处理状态 (query/update) 与 Actors 实现相结合。

还有一个更简单的:

import akka.NotUsed
import akka.actor.{Actor, ActorLogging}
import akka.persistence.query.{EventEnvelope, PersistenceQuery}
import akka.persistence.query.javadsl.{EventsByPersistenceIdQuery, ReadJournal}
import akka.persistence._
import akka.stream.ActorMaterializer
import akka.stream.javadsl.Source

/**
  * Created by alexv on 4/26/2017.
  */
class CQRSTest {

  // User Command, will be transformed to User Event
  sealed trait UserCommand
  // User Event
  // let's assume some conversion from Command to event here
  case class PersistedEvent(command: UserCommand) extends Serializable
  // User State, for simplicity assumed that all State will be snapshotted
  sealed trait State extends Serializable{
    def clear(): Unit
    def updateState(event: PersistedEvent): Unit
    def validateCommand(command:UserCommand): Boolean
    def applyShapshot(newState: State): Unit
    def getShapshot() : State
  }
  case class SaveSnapshot()

  /**
    * Common code for Both reader and writer
    * @param state - State
    */
  abstract class CQRSCore(state: State) extends PersistentActor with ActorLogging {
    override def persistenceId: String = "CQRSPersistenceId"

    override def preStart(): Unit = {
      // Since the state is external and not depends to Actor's failure or restarts it should be cleared.
      state.clear()
    }

    override def receiveRecover: Receive = {
      case event : PersistedEvent => state.updateState(event)
      case SnapshotOffer(_, snapshot: State) => state.applyShapshot(snapshot)
      case RecoveryCompleted  => onRecoveryCompleted(super.lastSequenceNr)
    }

    abstract def onRecoveryCompleted(lastSequenceNr:Long)
  }

  class CQRSWriter(state: State) extends CQRSCore(state){
    override def preStart(): Unit = {
      super.preStart()
      log.info("CQRSWriter Started")
    }

    override  def onRecoveryCompleted(lastSequenceNr: Long): Unit = {
      log.info("Recovery completed")
    }

    override def receiveCommand: Receive = {
      case command: UserCommand =>
        if(state.validateCommand(command)) {
          // Persist events and call state.updateState with each persisted event
          persistAll(List(PersistedEvent(command)))(state.updateState)
        }
        else {
          log.error("Validation Failed for Command: {}", command)
        }
      case SaveSnapshot => saveSnapshot(state.getShapshot())
      case SaveSnapshotSuccess(metadata) => log.debug("Saved snapshot successfully: {}", metadata)
      case SaveSnapshotFailure(metadata, reason) => log.error("Failed to Save snapshot: {} . Reason: {}", metadata, reason)
    }
  }

  class CQRSReader(state: State) extends CQRSCore(state){
    override def preStart(): Unit = {
      super.preStart()
      log.info("CQRSReader Started")
    }

    override  def onRecoveryCompleted(lastSequenceNr: Long): Unit = {
      log.info("Recovery completed, Starting QueryStream")

      // ReadJournal type not specified here, so may be used with Cassandra or In-memory Journal (for Tests)
      val readJournal = PersistenceQuery(context.system).readJournalFor(
        context.system.settings.config.getString("akka.persistence.query.my-read-journal"))
        .asInstanceOf[ReadJournal
        with EventsByPersistenceIdQuery]
      val source: Source[EventEnvelope, NotUsed] = readJournal.eventsByPersistenceId(
        OrgPersistentActor.orgPersistenceId, lastSequenceNr + 1, Long.MaxValue)
      source.runForeach({ envelope => state.updateState(envelope.event.asInstanceOf[PersistedEvent]) },ActorMaterializer())

    }

    // Nothing received since it is Reader only
    override def receiveCommand: Receive = Actor.emptyBehavior
  }
}