如何推迟 Spark/RabbitMQ 中的活动
How to postpone an event in Spark/RabbitMQ
我们正在设计一个系统,该系统将使用 RabbitMQ(可能是后来的 kafka)和 spark streaming 流式传输事件。一些或我们的活动已分解为几种活动类型,因此不会有太大的活动。这意味着某些事件必须等待其他事件(具有相同的 ID)。在特定事件的所有事件都到达之前,我们无法继续处理。
有没有办法在 spark streaming 中延迟一个事件的处理直到下一次处理window(如果另一个事件还没有到达)
谢谢
尼尔
从架构的角度,有几个问题需要考虑:
- 如何确定所有事件都已到达?
- 如果一个事件丢失了怎么办?
- 如果事件无序到达会怎样?倒数第一和相似?
从原则上讲,将原本作为一个整体形成的事件分解成多个部分似乎会增加复杂性并影响系统的可靠性。
无论如何要回答这个问题,自 Spark 1.6.x 以来,引入了一个新的有状态流函数:mapWithState
。 mapWithState
允许您保留每个键的状态信息并发出零个或多个相同或不同类型的事件以响应传入事件。
应用于这种情况,我们可以将状态建模为 State[PartialEvent]
:随着事件的到来,它们被组装在一个 PartialEvent
对象中。一旦满足事件完成的条件,mapWithState 可以生成一个 WholeEvent
对象以供下游处理。
该过程大致(*)如下所示:
val sourceEventDStream:DStream[Event] = ???
def stateUpdateFunction(eventId:String, event: Event, partialEventState: State[PartialEvent]): Option[WholeEvent] = {
val eventState = partialEventState.get() // Get current state of the event
val updatedEvent = merge(eventState, event)
if (updatedEvent.isComplete) {
partialEventState.remove()
Some(WholeEvent(updatedEvent))
} else {
partialEventState.update(updatedEvent)
None
}
}
val wholeEventDStream:DStream[WholeEvent] = sourceEventDStream.mapWithState(StateSpec.function(stateUpdateFunction))
//do stuff with wholeEventDStream ...
正如您所观察到的,通过这种方法,任何从未完成的 PartialEvent
都将永远处于该状态。我们还需要一个唯一的键来识别属于一起的事件。必须考虑超时选项以涵盖失败情况,但最重要的是,如果技术上可行,通过管道保留整个事件将是更好的方法。
(*) 未编译或测试。提供只是为了说明这个想法。
我们正在设计一个系统,该系统将使用 RabbitMQ(可能是后来的 kafka)和 spark streaming 流式传输事件。一些或我们的活动已分解为几种活动类型,因此不会有太大的活动。这意味着某些事件必须等待其他事件(具有相同的 ID)。在特定事件的所有事件都到达之前,我们无法继续处理。
有没有办法在 spark streaming 中延迟一个事件的处理直到下一次处理window(如果另一个事件还没有到达)
谢谢 尼尔
从架构的角度,有几个问题需要考虑:
- 如何确定所有事件都已到达?
- 如果一个事件丢失了怎么办?
- 如果事件无序到达会怎样?倒数第一和相似?
从原则上讲,将原本作为一个整体形成的事件分解成多个部分似乎会增加复杂性并影响系统的可靠性。
无论如何要回答这个问题,自 Spark 1.6.x 以来,引入了一个新的有状态流函数:mapWithState
。 mapWithState
允许您保留每个键的状态信息并发出零个或多个相同或不同类型的事件以响应传入事件。
应用于这种情况,我们可以将状态建模为 State[PartialEvent]
:随着事件的到来,它们被组装在一个 PartialEvent
对象中。一旦满足事件完成的条件,mapWithState 可以生成一个 WholeEvent
对象以供下游处理。
该过程大致(*)如下所示:
val sourceEventDStream:DStream[Event] = ???
def stateUpdateFunction(eventId:String, event: Event, partialEventState: State[PartialEvent]): Option[WholeEvent] = {
val eventState = partialEventState.get() // Get current state of the event
val updatedEvent = merge(eventState, event)
if (updatedEvent.isComplete) {
partialEventState.remove()
Some(WholeEvent(updatedEvent))
} else {
partialEventState.update(updatedEvent)
None
}
}
val wholeEventDStream:DStream[WholeEvent] = sourceEventDStream.mapWithState(StateSpec.function(stateUpdateFunction))
//do stuff with wholeEventDStream ...
正如您所观察到的,通过这种方法,任何从未完成的 PartialEvent
都将永远处于该状态。我们还需要一个唯一的键来识别属于一起的事件。必须考虑超时选项以涵盖失败情况,但最重要的是,如果技术上可行,通过管道保留整个事件将是更好的方法。
(*) 未编译或测试。提供只是为了说明这个想法。