Akka 持久性:ReadJournal.runFold 从不 returns

Akka Persistence: ReadJournal.runFold never returns

我正在尝试使用 Akka,更具体地说,这是第一次使用 Akka Persistence。我最终试图实现一个小玩具程序来复制 Akka 在事件源应用程序中的使用。在我尝试使用 ReadJournal 将我的事件流投射到我的域中之前,我一直取得成功。

def main(args: Array[String]): Unit = {
    val commands: EmployeeCommandStream = TestEmployeeCommandStream(EmployeeId.generate())

    implicit val executionContext = ExecutionContext.global
    implicit val system = ActorSystem.create("employee-service-actor-system")
    implicit val mat: Materializer = ActorMaterializer()(system)

    val service = system.actorOf(Props(classOf[EmployeeActor], commands.employeeId))

    commands.stream.foreach(command => service.tell(command, noSender))

    lazy val readJournal = PersistenceQuery(system).readJournalFor("inmemory-read-journal")
      .asInstanceOf[ReadJournal
      with CurrentPersistenceIdsQuery
      with CurrentEventsByPersistenceIdQuery
      with CurrentEventsByTagQuery
      with EventsByPersistenceIdQuery
      with EventsByTagQuery]

    println(Await.result(
      readJournal
        .eventsByPersistenceId(commands.employeeId.toString, 0L, Long.MaxValue)
        .map(_.event)
        .runFold(Employee.apply())({
          case (employee: Employee, event: EmployeeEvent) => employee.apply(event)
        }),
      Duration("10s")
    ))     
}

我的域的唯一聚合是 Employee,所以我只是用代表某个员工的 UUID 启动一个演员,然后我为那个员工发出一些命令。

在上面的示例中,如果我删除 println(Await.result(...)) 并将 .runFold(...) 替换为 .runForeach(println),我的 actor 中保留的事件将按预期为每个给定命令打印。所以我知道我的程序的写入端和 ReadJournal 都按预期工作。

按原样,我的程序以

终止
Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]

所以现在我的问题是,为什么我不能执行 runFold 以最终重播我的事件流?有一个更好的方法吗?我只是在滥用 API 吗?

任何帮助将不胜感激,谢谢!

使用runFold,您正在对流进行折叠。当流本身终止时,折叠将有效终止。

通过使用 eventsByPersistenceId,您要求的是永无止境的直播活动流,因此您的弃牌不会终止。

对于您的用例,您应该使用 currentEventsByPersistenceId。此变体将流式传输日志中当前可用的事件并终止。

https://doc.akka.io/docs/akka/2.5.6/scala/persistence-query.html#eventsbypersistenceidquery-and-currenteventsbypersistenceidquery