Akka 持久性:ReadJournal.runFold 从不 returns
Akka Persistence: ReadJournal.runFold never returns
我正在尝试使用 Akka,更具体地说,这是第一次使用 Akka Persistence。我最终试图实现一个小玩具程序来复制 Akka 在事件源应用程序中的使用。在我尝试使用 ReadJournal
将我的事件流投射到我的域中之前,我一直取得成功。
def main(args: Array[String]): Unit = {
val commands: EmployeeCommandStream = TestEmployeeCommandStream(EmployeeId.generate())
implicit val executionContext = ExecutionContext.global
implicit val system = ActorSystem.create("employee-service-actor-system")
implicit val mat: Materializer = ActorMaterializer()(system)
val service = system.actorOf(Props(classOf[EmployeeActor], commands.employeeId))
commands.stream.foreach(command => service.tell(command, noSender))
lazy val readJournal = PersistenceQuery(system).readJournalFor("inmemory-read-journal")
.asInstanceOf[ReadJournal
with CurrentPersistenceIdsQuery
with CurrentEventsByPersistenceIdQuery
with CurrentEventsByTagQuery
with EventsByPersistenceIdQuery
with EventsByTagQuery]
println(Await.result(
readJournal
.eventsByPersistenceId(commands.employeeId.toString, 0L, Long.MaxValue)
.map(_.event)
.runFold(Employee.apply())({
case (employee: Employee, event: EmployeeEvent) => employee.apply(event)
}),
Duration("10s")
))
}
我的域的唯一聚合是 Employee
,所以我只是用代表某个员工的 UUID 启动一个演员,然后我为那个员工发出一些命令。
在上面的示例中,如果我删除 println(Await.result(...))
并将 .runFold(...)
替换为 .runForeach(println)
,我的 actor 中保留的事件将按预期为每个给定命令打印。所以我知道我的程序的写入端和 ReadJournal
都按预期工作。
按原样,我的程序以
终止
Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
所以现在我的问题是,为什么我不能执行 runFold
以最终重播我的事件流?有一个更好的方法吗?我只是在滥用 API 吗?
任何帮助将不胜感激,谢谢!
使用runFold
,您正在对流进行折叠。当流本身终止时,折叠将有效终止。
通过使用 eventsByPersistenceId
,您要求的是永无止境的直播活动流,因此您的弃牌不会终止。
对于您的用例,您应该使用 currentEventsByPersistenceId
。此变体将流式传输日志中当前可用的事件并终止。
我正在尝试使用 Akka,更具体地说,这是第一次使用 Akka Persistence。我最终试图实现一个小玩具程序来复制 Akka 在事件源应用程序中的使用。在我尝试使用 ReadJournal
将我的事件流投射到我的域中之前,我一直取得成功。
def main(args: Array[String]): Unit = {
val commands: EmployeeCommandStream = TestEmployeeCommandStream(EmployeeId.generate())
implicit val executionContext = ExecutionContext.global
implicit val system = ActorSystem.create("employee-service-actor-system")
implicit val mat: Materializer = ActorMaterializer()(system)
val service = system.actorOf(Props(classOf[EmployeeActor], commands.employeeId))
commands.stream.foreach(command => service.tell(command, noSender))
lazy val readJournal = PersistenceQuery(system).readJournalFor("inmemory-read-journal")
.asInstanceOf[ReadJournal
with CurrentPersistenceIdsQuery
with CurrentEventsByPersistenceIdQuery
with CurrentEventsByTagQuery
with EventsByPersistenceIdQuery
with EventsByTagQuery]
println(Await.result(
readJournal
.eventsByPersistenceId(commands.employeeId.toString, 0L, Long.MaxValue)
.map(_.event)
.runFold(Employee.apply())({
case (employee: Employee, event: EmployeeEvent) => employee.apply(event)
}),
Duration("10s")
))
}
我的域的唯一聚合是 Employee
,所以我只是用代表某个员工的 UUID 启动一个演员,然后我为那个员工发出一些命令。
在上面的示例中,如果我删除 println(Await.result(...))
并将 .runFold(...)
替换为 .runForeach(println)
,我的 actor 中保留的事件将按预期为每个给定命令打印。所以我知道我的程序的写入端和 ReadJournal
都按预期工作。
按原样,我的程序以
终止Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
所以现在我的问题是,为什么我不能执行 runFold
以最终重播我的事件流?有一个更好的方法吗?我只是在滥用 API 吗?
任何帮助将不胜感激,谢谢!
使用runFold
,您正在对流进行折叠。当流本身终止时,折叠将有效终止。
通过使用 eventsByPersistenceId
,您要求的是永无止境的直播活动流,因此您的弃牌不会终止。
对于您的用例,您应该使用 currentEventsByPersistenceId
。此变体将流式传输日志中当前可用的事件并终止。