如何使用 Akka 持久性查询检索所有日志事件?
How to retrieve all journal events using Akka persistence queries?
Akka 持久性查询具有以下预定义操作:
EventsByPersistenceId
事件标签
CurrentEventsByPersistenceId
CurrentEventsByTag
AllPersistenceIds
但是如果我需要获取所有过去的事件,某种 CurrentEvents 操作怎么办?我不知道如何在 Akka 持久性查询术语中实现它。
持久化查询模块我不是很熟悉,但是这些操作都是定义了一些akka-streams的源码。您可以尝试按如下方式组合它们:
def currentEvents(fromSequenceNr: Long, toSequenceNr: Long): Source[EventEnvelope, NotUsed] =
currentPersistenceIds().flatMapConcat(id => currentEventsByPersistenceId(id, fromSequenceNr, toSequenceNr))
(具有并发宽度的 flatMapMerge
是 flatMapConcat
的替代方案,以防您想并行化它)
Akka 持久性查询具有以下预定义操作:
EventsByPersistenceId 事件标签 CurrentEventsByPersistenceId CurrentEventsByTag AllPersistenceIds
但是如果我需要获取所有过去的事件,某种 CurrentEvents 操作怎么办?我不知道如何在 Akka 持久性查询术语中实现它。
持久化查询模块我不是很熟悉,但是这些操作都是定义了一些akka-streams的源码。您可以尝试按如下方式组合它们:
def currentEvents(fromSequenceNr: Long, toSequenceNr: Long): Source[EventEnvelope, NotUsed] =
currentPersistenceIds().flatMapConcat(id => currentEventsByPersistenceId(id, fromSequenceNr, toSequenceNr))
(具有并发宽度的 flatMapMerge
是 flatMapConcat
的替代方案,以防您想并行化它)