只读一遍日记
Read journal exactly once
我正在使用 akka-persistence 的 PersistenceQuery 将初始状态加载到管理内容的 actor。我希望它在启动时只重播一次,但它一直将这些发送到日志。
14:11:28.405 [rooms-akka.actor.default-dispatcher-4] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - request replay for persistenceId [rooms] from [4] to [9223372036854775807] limit [100]
14:11:28.407 [rooms-akka.actor.default-dispatcher-17] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - replay completed for persistenceId [rooms], currSeqNo [4]
14:11:31.376 [rooms-akka.actor.default-dispatcher-17] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - request replay for persistenceId [rooms] from [4] to [9223372036854775807] limit [100]
14:11:31.377 [rooms-akka.actor.default-dispatcher-17] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - replay completed for persistenceId [rooms], currSeqNo [4]
14:11:34.376 [rooms-akka.actor.default-dispatcher-4] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - request replay for persistenceId [rooms] from [4] to [9223372036854775807] limit [100]
14:11:34.378 [rooms-akka.actor.default-dispatcher-4] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - replay completed for persistenceId [rooms], currSeqNo [4]
这是我为实现它而编写的程序。
implicit val mat = ActorMaterializer()(context)
val queries = PersistenceQuery(context.system).readJournalFor[LeveldbReadJournal](
LeveldbReadJournal.Identifier)
val src: Source[EventEnvelope, NotUsed] = queries.eventsByPersistenceId("rooms", 0L, Long.MaxValue)
val events: Source[Any, NotUsed] = src.map(_.event)
val future = events.runWith(Sink.foreach{
case x: RoomCreated => process(x)
case x: RoomDeleted => process(x)
case x => logger.error(s"Could not spawn $x")
})
我认为您预期的行为与您实际看到的不同之处在于 eventsByPersistenceId
是一个 "live" 流。这意味着它不仅 return 事件会在您提供的偏移量范围内开始(您从 0 开始并转到 Long.MaxValue,所以一切),而且如果新事件来了还会继续向您发送in. 如果您不想直播,请将呼叫更改为 currentEventsByPersistenceId
。这将只包括那个时间点(您提出请求的时间)之前的事情,而不是直播。那应该是你要找的。
我正在使用 akka-persistence 的 PersistenceQuery 将初始状态加载到管理内容的 actor。我希望它在启动时只重播一次,但它一直将这些发送到日志。
14:11:28.405 [rooms-akka.actor.default-dispatcher-4] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - request replay for persistenceId [rooms] from [4] to [9223372036854775807] limit [100]
14:11:28.407 [rooms-akka.actor.default-dispatcher-17] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - replay completed for persistenceId [rooms], currSeqNo [4]
14:11:31.376 [rooms-akka.actor.default-dispatcher-17] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - request replay for persistenceId [rooms] from [4] to [9223372036854775807] limit [100]
14:11:31.377 [rooms-akka.actor.default-dispatcher-17] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - replay completed for persistenceId [rooms], currSeqNo [4]
14:11:34.376 [rooms-akka.actor.default-dispatcher-4] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - request replay for persistenceId [rooms] from [4] to [9223372036854775807] limit [100]
14:11:34.378 [rooms-akka.actor.default-dispatcher-4] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - replay completed for persistenceId [rooms], currSeqNo [4]
这是我为实现它而编写的程序。
implicit val mat = ActorMaterializer()(context)
val queries = PersistenceQuery(context.system).readJournalFor[LeveldbReadJournal](
LeveldbReadJournal.Identifier)
val src: Source[EventEnvelope, NotUsed] = queries.eventsByPersistenceId("rooms", 0L, Long.MaxValue)
val events: Source[Any, NotUsed] = src.map(_.event)
val future = events.runWith(Sink.foreach{
case x: RoomCreated => process(x)
case x: RoomDeleted => process(x)
case x => logger.error(s"Could not spawn $x")
})
我认为您预期的行为与您实际看到的不同之处在于 eventsByPersistenceId
是一个 "live" 流。这意味着它不仅 return 事件会在您提供的偏移量范围内开始(您从 0 开始并转到 Long.MaxValue,所以一切),而且如果新事件来了还会继续向您发送in. 如果您不想直播,请将呼叫更改为 currentEventsByPersistenceId
。这将只包括那个时间点(您提出请求的时间)之前的事情,而不是直播。那应该是你要找的。