Spark mapWithState API 解释

Spark mapWithState API explanation

我一直在使用 Spark Streaming 中的 mapWithState API,但是 StateSpec.function 有两点不清楚:

假设我的函数是:

def trackStateForKey(batchTime: Time,
                     key: Long,
                     newValue: Option[JobData],
                     currentState: State[JobData]): Option[(Long, JobData)]
  1. 为什么新值是 Option[T] 类型?据我所知,它总是为我定义的,并且由于应该使用新状态调用该方法,所以我真的不明白为什么它可以是可选的。

  2. return 值是什么意思?我试图在文档和源代码中找到一些指示,但其中 none 描述了它的用途。由于我正在使用 state.remove()state.update() 修改键的状态,为什么我必须对 return 值做同样的事情?

    在我当前的实现中,我 return None 如果我删除密钥, Some(newState) 如果我更新它,但我不确定这是否正确。

Why is the new value an Option[T] type? As far as I've seen, it was always defined for me, and since the method is supposed to be called with a new state, I don't really see the point why it could be optional.

这是一个 Option[T],因为如果您使用 StateSpec.timeout 设置超时,例如:

StateSpec.function(spec _).timeout(Milliseconds(5000))

那么函数超时后传入的值将是 None 并且 State[T] 上的 isTimingOut 方法将返回 true。这是有道理的,因为状态超时并不意味着指定键的新值已经到达,并且通常比为 T 传递 null 更安全(这不适用于无论如何基元)因为你希望用户安全地操作 Option[T].

您可以在 Sparks 实现中看到:

// Get the timed out state records, call the mapping function on each and collect the
// data returned
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
  newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) =>
    wrappedState.wrapTimingOutState(state)
    val returned = mappingFunction(batchTime, key, None, wrappedState) // <-- This.
    mappedData ++= returned
    newStateMap.remove(key)
  }
}

What does the return value mean? I tried to find some pointers in the documentations and source code, but none of them describe what it is used for. Since I'm modifying the state of a key using state.remove() and state.update(), why would I have to do the same with return values?

return 值是一种沿着火花图传递中间状态的方法。例如,假设我想更新我的状态,但也在我的管道中使用 中间 数据执行一些操作,例如:

dStream
  .mapWithState(stateSpec)
  .map(optionIntermediateResult.map(_ * 2))
  .foreachRDD( /* other stuff */)

那个 return 值正是让我能够继续对所述数据进行操作的原因。如果你不关心中间结果,只想要完整的状态,那么输出 None 完全没问题。

编辑:

我写了一篇 blog post(在这个问题之后)试图对 API.

进行深入的解释