删除持久消息
Akka delete persistent message(s)
我正在编写一个应用程序来处理设备引发的事件(每小时 100 万)。一些事件将被聚合(并且具有很长的时间跨度(例如 48 小时)),其中包含开始事件、状态(x 次)事件和结束事件。其他是可以立即处理的单一事件。为了至少保证一次事件将被处理,我正在研究 akka-persistence。应用程序的其他部分已经使用 akka 和 kafka。
我的目标解决方案应该包含一个持久映射,其中可以通过事件 ID 轻松挑选事件。顺序不太重要。事件处理完成后,可以将其从地图中删除(并且不应再保留)。
在找到的文档/示例中,我找到了满足每个事件清除要求的队列示例,但很难轻松查找(必须循环队列才能找到事件)。为了满足简单的查找,我想到了使用地图,使用 PersistentActor 特征和下面的一些数据库。然而,事件被序列号清除(这将删除需要更多处理/正在等待其他事件发生的事件)。另一个被调查的特征是 AtLeastOnceDelivery,其交付确认满足要求,但在处理完所有事件之前,这个特征会在恢复时阻塞。
关于如何在 Akka 中实现持久的事件篮子有什么想法吗? (顺便说一句,我正在使用 scala)
这样的东西能满足您的需求吗?
这可能不完全是你的逻辑,但基本上它接收到一个新事件,坚持它接收到事件的事实,然后使用 id 将事件保存到地图中。
然后在某个时候(不确定你如何触发事件处理)它收到命令来处理具有特定 ID 的事件。它坚持它应该处理事件的事实,然后处理事件并将其从地图中删除。
这样地图将在重新启动时恢复,并且您可以通过 Id 访问所有尚未处理的事件。
class PersistentMapActor extends PersistentActor {
private var eventMap: Map[ Int, Event ] = Map[ Int, Event ]( )
override def receiveRecover: Receive = {
case NewEventSaved( payload: Event ) =>
eventMap = eventMap + ( (payload.eventId, payload) )
case EventHandled( eventId ) =>
eventMap = eventMap - eventId
}
override def receiveCommand: Receive = {
case SaveNewEvent( payload ) =>
persist( NewEventSaved( payload ) ) { persistedNewEvent =>
//Add new event to map
eventMap = eventMap + ( (payload.eventId, payload) )
}
case HandleEvent( eventId ) =>
val event = eventMap.get( eventId )
event.foreach { e =>
persist( EventHandled( eventId ) ) { persistedEvent =>
//Do stuff with the event
//Remove the event from the map
eventMap = eventMap - eventId
}
}
}
override def persistenceId: String = "PersistentMapActor"
}
object PersistentMapActor {
case class Event( eventId: Int, someField: String )
case class SaveNewEvent( payload: Event )
case class NewEventSaved( payload: Event )
case class HandleEvent( eventId: Int )
case class EventHandled( eventId: Int )
}
我正在编写一个应用程序来处理设备引发的事件(每小时 100 万)。一些事件将被聚合(并且具有很长的时间跨度(例如 48 小时)),其中包含开始事件、状态(x 次)事件和结束事件。其他是可以立即处理的单一事件。为了至少保证一次事件将被处理,我正在研究 akka-persistence。应用程序的其他部分已经使用 akka 和 kafka。
我的目标解决方案应该包含一个持久映射,其中可以通过事件 ID 轻松挑选事件。顺序不太重要。事件处理完成后,可以将其从地图中删除(并且不应再保留)。
在找到的文档/示例中,我找到了满足每个事件清除要求的队列示例,但很难轻松查找(必须循环队列才能找到事件)。为了满足简单的查找,我想到了使用地图,使用 PersistentActor 特征和下面的一些数据库。然而,事件被序列号清除(这将删除需要更多处理/正在等待其他事件发生的事件)。另一个被调查的特征是 AtLeastOnceDelivery,其交付确认满足要求,但在处理完所有事件之前,这个特征会在恢复时阻塞。
关于如何在 Akka 中实现持久的事件篮子有什么想法吗? (顺便说一句,我正在使用 scala)
这样的东西能满足您的需求吗? 这可能不完全是你的逻辑,但基本上它接收到一个新事件,坚持它接收到事件的事实,然后使用 id 将事件保存到地图中。 然后在某个时候(不确定你如何触发事件处理)它收到命令来处理具有特定 ID 的事件。它坚持它应该处理事件的事实,然后处理事件并将其从地图中删除。 这样地图将在重新启动时恢复,并且您可以通过 Id 访问所有尚未处理的事件。
class PersistentMapActor extends PersistentActor {
private var eventMap: Map[ Int, Event ] = Map[ Int, Event ]( )
override def receiveRecover: Receive = {
case NewEventSaved( payload: Event ) =>
eventMap = eventMap + ( (payload.eventId, payload) )
case EventHandled( eventId ) =>
eventMap = eventMap - eventId
}
override def receiveCommand: Receive = {
case SaveNewEvent( payload ) =>
persist( NewEventSaved( payload ) ) { persistedNewEvent =>
//Add new event to map
eventMap = eventMap + ( (payload.eventId, payload) )
}
case HandleEvent( eventId ) =>
val event = eventMap.get( eventId )
event.foreach { e =>
persist( EventHandled( eventId ) ) { persistedEvent =>
//Do stuff with the event
//Remove the event from the map
eventMap = eventMap - eventId
}
}
}
override def persistenceId: String = "PersistentMapActor"
}
object PersistentMapActor {
case class Event( eventId: Int, someField: String )
case class SaveNewEvent( payload: Event )
case class NewEventSaved( payload: Event )
case class HandleEvent( eventId: Int )
case class EventHandled( eventId: Int )
}