只读一遍日记

Read journal exactly once

我正在使用 akka-persistence 的 PersistenceQuery 将初始状态加载到管理内容的 actor。我希望它在启动时只重播一次,但它一直将这些发送到日志。

14:11:28.405 [rooms-akka.actor.default-dispatcher-4] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - request replay for persistenceId [rooms] from [4] to [9223372036854775807] limit [100]
14:11:28.407 [rooms-akka.actor.default-dispatcher-17] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - replay completed for persistenceId [rooms], currSeqNo [4]
14:11:31.376 [rooms-akka.actor.default-dispatcher-17] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - request replay for persistenceId [rooms] from [4] to [9223372036854775807] limit [100]
14:11:31.377 [rooms-akka.actor.default-dispatcher-17] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - replay completed for persistenceId [rooms], currSeqNo [4]
14:11:34.376 [rooms-akka.actor.default-dispatcher-4] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - request replay for persistenceId [rooms] from [4] to [9223372036854775807] limit [100]
14:11:34.378 [rooms-akka.actor.default-dispatcher-4] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - replay completed for persistenceId [rooms], currSeqNo [4]

这是我为实现它而编写的程序。

implicit val mat = ActorMaterializer()(context)
val queries = PersistenceQuery(context.system).readJournalFor[LeveldbReadJournal](
      LeveldbReadJournal.Identifier)
val src: Source[EventEnvelope, NotUsed] = queries.eventsByPersistenceId("rooms", 0L, Long.MaxValue)
val events: Source[Any, NotUsed] = src.map(_.event)
val future = events.runWith(Sink.foreach{
  case x: RoomCreated => process(x)
  case x: RoomDeleted => process(x)
  case x => logger.error(s"Could not spawn $x")
})

我认为您预期的行为与您实际看到的不同之处在于 eventsByPersistenceId 是一个 "live" 流。这意味着它不仅 return 事件会在您提供的偏移量范围内开始(您从 0 开始并转到 Long.MaxValue,所以一切),而且如果新事件来了还会继续向您发送in. 如果您不想直播,请将呼叫更改为 currentEventsByPersistenceId。这将只包括那个时间点(您提出请求的时间)之前的事情,而不是直播。那应该是你要找的。