为什么 Akka Persisence Query Read Journal 不检索我的事件?
Why doesn't the Akka Persisence Query Read Journal retrieve my events?
我在理解 Akka 持久性查询时遇到问题,尤其是 eventsByTag 方法,因为它的行为与我预期的不同。
在我的主 class 中,我调用了一个 class,它开始监听任何使用特定标签持久化的事件。
class CassandraJournal(implicit val system: ActorSystem) {
def engageStreaming = {
val readJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
implicit val mat = ActorMaterializer()
readJournal.eventsByTag("account", Offset.noOffset)
.runForeach { event => println(event) }
}
}
每当我启动我的服务器并且我的事件存储是空的并且我坚持我的第一个事件(通过调用一个内置于 Akka HTTP 的 http 服务)时,该事件确实被打印出来。但是,当我重新启动服务器并且事件存储中已经有事件时,不会打印新的持久化事件。
这有解释吗?我很难弄清楚为什么会这样。
编辑
我使用的事件存储是 Cassandra。这是 PersistentActor(我没有使用事件适配器来标记事件,只是将它们包裹在 Tagged() 周围)
class Account(id: UUID) extends PersistentActor {
override def receiveRecover: Receive = {
case createCheckingsAccount: CreateCheckingsAccount =>
println("Creating checkings account")
}
override def receiveCommand: Receive = {
case createCheckingsAccount: CreateCheckingsAccount =>
persist(Tagged(CheckingsAccountCreated(id), Set("account"))) { event =>
val checkingsAccountCreatedEvent = event.payload.asInstanceOf[CheckingsAccountCreated]
sender ! CreateCheckingsAccountResponse(checkingsAccountCreatedEvent.id.toString)
}
}
def updateState(evt: Event): Unit = {
}
override def persistenceId: String = s"account-$id"
}
由于 receiveRecover
没有进行必要的状态恢复工作,持久性将无法正常工作。我建议在 receiveRecover
中放置一些基本的状态恢复逻辑,并让你的 updateState
方法涵盖也标记事件案例。
我在具有类似于以下状态恢复逻辑的应用程序中使用了 eventsByTag
,它在重新启动和恢复时都运行良好。
def updateState(e: Any): Unit = e match {
case evt: Event =>
state = state.updated(evt)
case Tagged(evt: Event, _) =>
state = state.updated(evt)
}
...
override def receiveRecover: Receive = {
case evt: Event => updateState(evt)
case taggedEvt: Tagged => updateState(taggedEvt)
}
我在理解 Akka 持久性查询时遇到问题,尤其是 eventsByTag 方法,因为它的行为与我预期的不同。
在我的主 class 中,我调用了一个 class,它开始监听任何使用特定标签持久化的事件。
class CassandraJournal(implicit val system: ActorSystem) {
def engageStreaming = {
val readJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
implicit val mat = ActorMaterializer()
readJournal.eventsByTag("account", Offset.noOffset)
.runForeach { event => println(event) }
}
}
每当我启动我的服务器并且我的事件存储是空的并且我坚持我的第一个事件(通过调用一个内置于 Akka HTTP 的 http 服务)时,该事件确实被打印出来。但是,当我重新启动服务器并且事件存储中已经有事件时,不会打印新的持久化事件。
这有解释吗?我很难弄清楚为什么会这样。
编辑
我使用的事件存储是 Cassandra。这是 PersistentActor(我没有使用事件适配器来标记事件,只是将它们包裹在 Tagged() 周围)
class Account(id: UUID) extends PersistentActor {
override def receiveRecover: Receive = {
case createCheckingsAccount: CreateCheckingsAccount =>
println("Creating checkings account")
}
override def receiveCommand: Receive = {
case createCheckingsAccount: CreateCheckingsAccount =>
persist(Tagged(CheckingsAccountCreated(id), Set("account"))) { event =>
val checkingsAccountCreatedEvent = event.payload.asInstanceOf[CheckingsAccountCreated]
sender ! CreateCheckingsAccountResponse(checkingsAccountCreatedEvent.id.toString)
}
}
def updateState(evt: Event): Unit = {
}
override def persistenceId: String = s"account-$id"
}
由于 receiveRecover
没有进行必要的状态恢复工作,持久性将无法正常工作。我建议在 receiveRecover
中放置一些基本的状态恢复逻辑,并让你的 updateState
方法涵盖也标记事件案例。
我在具有类似于以下状态恢复逻辑的应用程序中使用了 eventsByTag
,它在重新启动和恢复时都运行良好。
def updateState(e: Any): Unit = e match {
case evt: Event =>
state = state.updated(evt)
case Tagged(evt: Event, _) =>
state = state.updated(evt)
}
...
override def receiveRecover: Receive = {
case evt: Event => updateState(evt)
case taggedEvt: Tagged => updateState(taggedEvt)
}