如何使用 Akka 持久性查询检索所有日志事件?

How to retrieve all journal events using Akka persistence queries?

Akka 持久性查询具有以下预定义操作:

EventsByPersistenceId 事件标签 CurrentEventsByPersistenceId CurrentEventsByTag AllPersistenceIds

但是如果我需要获取所有过去的事件,某种 CurrentEvents 操作怎么办?我不知道如何在 Akka 持久性查询术语中实现它。

持久化查询模块我不是很熟悉,但是这些操作都是定义了一些akka-streams的源码。您可以尝试按如下方式组合它们:

  def currentEvents(fromSequenceNr: Long, toSequenceNr: Long): Source[EventEnvelope, NotUsed] =
    currentPersistenceIds().flatMapConcat(id => currentEventsByPersistenceId(id, fromSequenceNr, toSequenceNr))

(具有并发宽度的 flatMapMergeflatMapConcat 的替代方案,以防您想并行化它)