如何处理从状态中删除的数据

How to handle removed data from state

我有一个 sessionization 用例。由于 mapWithstate(),我保留了我的会话 in-memory 并为每个传入日志更新它们。当会话结束时,用特定日志发出信号,我想检索它并将其从我的 State.

中删除

我偶然发现的问题是我无法在每个 batch 结束时检索和删除 (remove()) 我的会话,因为检索发生在 updateFunction() 和删除之外在其中,即一旦删除会话就无法检索,并且如果会话结束,则不应再有它的日志,不再有 keys.

我仍然可以检索已结束的会话,但 "dead" 会话的数量会增加,从而产生一个整体异常(“State-溢出”),如果不加以控制将威胁到系统本身.此解决方案不可接受。

这似乎是一个常见的用例,我想知道是否有人提出了解决方案?


编辑

示例代码如下:

def mapWithStateContainer(iResultParsing: DStream[(String, SessionEvent)]) = {
  val lStateSpec = StateSpec.function(stateUpdateFunction _).timeout(Seconds(TIMEOUT)

  val lResultMapWithState: DStream[(String, Session)] = 
        iResultParsing.mapWithState(lStateSpec).stateSnapshots()

  val lClosedSession: DStream[(String, Session)] = 
        lResultMapWithState.filter(_._2.mTimeout)

    //ideally remove here lClosedSession from the state
}

private def stateUpdateFunction(iKey: String,
                                iValue: Option[SessionEvent],
                                iState: State[Session]): Option[(String, Session)] = {
  var lResult = None: Option[(String, Session)]

  if (iState.isTimingOut()) {
    val lClosedSession = iState.get()
    lClosedSession.mTimeout = true

    lResult = Some(iKey, lClosedSession)
  } else if (iState.exists) {
      val lUpdatedSession = updateSession(lCurrentSession, iValue)
      iState.update(lUpdatedSession)

      lResult = Some(iKey, lUpdatedSession)

      // we wish to remove the lUpdatedSession from the state once retrieved with lResult
      /*if (lUpdatedSession.mTimeout) {
         iState.remove()
         lResult = None
       }*/
    } else {
       val lInitialState = initSession(iValue)
       iState.update(lInitialState)

       lResult = Some(iKey, lInitialState)
    }

    lResult
}

private def updateSession(iCurrentSession: Session, 
                          iNewData: Option[SessionEvent]): Session = {
    //user disconnects manually
    if (iNewData.get.mDisconnection) {
        iCurrentSession.mTimeout = true
    }

    iCurrentSession
}

您可以 return 更新状态作为 mapWithState 操作的 return 值 而不是调用 MapWithStateRDD.stateSnapshot .这样,最终状态在有状态 DStream 之外始终可用。

这意味着您可以:

else if (iState.exists) {
  val lUpdatedSession = updateSession(lCurrentSession, iValue)
  iState.update(lUpdatedSession)

  if (lUpdatedSession.mTimeout) {
    iState.remove()
  }

  Some(iKey, lUpdatedSession)
}

现在将图表更改为:

val lResultMapWithState = iResultParsing
                            .mapWithState(lStateSpec)
                            .filter { case (_, session) => session.mTimeout }

现在状态正在内部删除,但是因为您是从StateSpec函数中return获取它,所以它可用给你外面做进一步处理。