如何处理从状态中删除的数据
How to handle removed data from state
我有一个 sessionization
用例。由于 mapWithstate()
,我保留了我的会话 in-memory
并为每个传入日志更新它们。当会话结束时,用特定日志发出信号,我想检索它并将其从我的 State
.
中删除
我偶然发现的问题是我无法在每个 batch
结束时检索和删除 (remove()
) 我的会话,因为检索发生在 updateFunction()
和删除之外在其中,即一旦删除会话就无法检索,并且如果会话结束,则不应再有它的日志,不再有 key
s.
我仍然可以检索已结束的会话,但 "dead" 会话的数量会增加,从而产生一个整体异常(“State
-溢出”),如果不加以控制将威胁到系统本身.此解决方案不可接受。
这似乎是一个常见的用例,我想知道是否有人提出了解决方案?
编辑
示例代码如下:
def mapWithStateContainer(iResultParsing: DStream[(String, SessionEvent)]) = {
val lStateSpec = StateSpec.function(stateUpdateFunction _).timeout(Seconds(TIMEOUT)
val lResultMapWithState: DStream[(String, Session)] =
iResultParsing.mapWithState(lStateSpec).stateSnapshots()
val lClosedSession: DStream[(String, Session)] =
lResultMapWithState.filter(_._2.mTimeout)
//ideally remove here lClosedSession from the state
}
private def stateUpdateFunction(iKey: String,
iValue: Option[SessionEvent],
iState: State[Session]): Option[(String, Session)] = {
var lResult = None: Option[(String, Session)]
if (iState.isTimingOut()) {
val lClosedSession = iState.get()
lClosedSession.mTimeout = true
lResult = Some(iKey, lClosedSession)
} else if (iState.exists) {
val lUpdatedSession = updateSession(lCurrentSession, iValue)
iState.update(lUpdatedSession)
lResult = Some(iKey, lUpdatedSession)
// we wish to remove the lUpdatedSession from the state once retrieved with lResult
/*if (lUpdatedSession.mTimeout) {
iState.remove()
lResult = None
}*/
} else {
val lInitialState = initSession(iValue)
iState.update(lInitialState)
lResult = Some(iKey, lInitialState)
}
lResult
}
private def updateSession(iCurrentSession: Session,
iNewData: Option[SessionEvent]): Session = {
//user disconnects manually
if (iNewData.get.mDisconnection) {
iCurrentSession.mTimeout = true
}
iCurrentSession
}
您可以 return 更新状态作为 mapWithState
操作的 return 值 而不是调用 MapWithStateRDD.stateSnapshot
.这样,最终状态在有状态 DStream 之外始终可用。
这意味着您可以:
else if (iState.exists) {
val lUpdatedSession = updateSession(lCurrentSession, iValue)
iState.update(lUpdatedSession)
if (lUpdatedSession.mTimeout) {
iState.remove()
}
Some(iKey, lUpdatedSession)
}
现在将图表更改为:
val lResultMapWithState = iResultParsing
.mapWithState(lStateSpec)
.filter { case (_, session) => session.mTimeout }
现在状态正在内部删除,但是因为您是从StateSpec
函数中return获取它,所以它可用给你外面做进一步处理。
我有一个 sessionization
用例。由于 mapWithstate()
,我保留了我的会话 in-memory
并为每个传入日志更新它们。当会话结束时,用特定日志发出信号,我想检索它并将其从我的 State
.
我偶然发现的问题是我无法在每个 batch
结束时检索和删除 (remove()
) 我的会话,因为检索发生在 updateFunction()
和删除之外在其中,即一旦删除会话就无法检索,并且如果会话结束,则不应再有它的日志,不再有 key
s.
我仍然可以检索已结束的会话,但 "dead" 会话的数量会增加,从而产生一个整体异常(“State
-溢出”),如果不加以控制将威胁到系统本身.此解决方案不可接受。
这似乎是一个常见的用例,我想知道是否有人提出了解决方案?
编辑
示例代码如下:
def mapWithStateContainer(iResultParsing: DStream[(String, SessionEvent)]) = {
val lStateSpec = StateSpec.function(stateUpdateFunction _).timeout(Seconds(TIMEOUT)
val lResultMapWithState: DStream[(String, Session)] =
iResultParsing.mapWithState(lStateSpec).stateSnapshots()
val lClosedSession: DStream[(String, Session)] =
lResultMapWithState.filter(_._2.mTimeout)
//ideally remove here lClosedSession from the state
}
private def stateUpdateFunction(iKey: String,
iValue: Option[SessionEvent],
iState: State[Session]): Option[(String, Session)] = {
var lResult = None: Option[(String, Session)]
if (iState.isTimingOut()) {
val lClosedSession = iState.get()
lClosedSession.mTimeout = true
lResult = Some(iKey, lClosedSession)
} else if (iState.exists) {
val lUpdatedSession = updateSession(lCurrentSession, iValue)
iState.update(lUpdatedSession)
lResult = Some(iKey, lUpdatedSession)
// we wish to remove the lUpdatedSession from the state once retrieved with lResult
/*if (lUpdatedSession.mTimeout) {
iState.remove()
lResult = None
}*/
} else {
val lInitialState = initSession(iValue)
iState.update(lInitialState)
lResult = Some(iKey, lInitialState)
}
lResult
}
private def updateSession(iCurrentSession: Session,
iNewData: Option[SessionEvent]): Session = {
//user disconnects manually
if (iNewData.get.mDisconnection) {
iCurrentSession.mTimeout = true
}
iCurrentSession
}
您可以 return 更新状态作为 mapWithState
操作的 return 值 而不是调用 MapWithStateRDD.stateSnapshot
.这样,最终状态在有状态 DStream 之外始终可用。
这意味着您可以:
else if (iState.exists) {
val lUpdatedSession = updateSession(lCurrentSession, iValue)
iState.update(lUpdatedSession)
if (lUpdatedSession.mTimeout) {
iState.remove()
}
Some(iKey, lUpdatedSession)
}
现在将图表更改为:
val lResultMapWithState = iResultParsing
.mapWithState(lStateSpec)
.filter { case (_, session) => session.mTimeout }
现在状态正在内部删除,但是因为您是从StateSpec
函数中return获取它,所以它可用给你外面做进一步处理。