Spark mapWithState API 解释
Spark mapWithState API explanation
我一直在使用 Spark Streaming 中的 mapWithState
API,但是 StateSpec.function
有两点不清楚:
假设我的函数是:
def trackStateForKey(batchTime: Time,
key: Long,
newValue: Option[JobData],
currentState: State[JobData]): Option[(Long, JobData)]
为什么新值是 Option[T]
类型?据我所知,它总是为我定义的,并且由于应该使用新状态调用该方法,所以我真的不明白为什么它可以是可选的。
return 值是什么意思?我试图在文档和源代码中找到一些指示,但其中 none 描述了它的用途。由于我正在使用 state.remove()
和 state.update()
修改键的状态,为什么我必须对 return 值做同样的事情?
在我当前的实现中,我 return None
如果我删除密钥, Some(newState)
如果我更新它,但我不确定这是否正确。
Why is the new value an Option[T]
type? As far as I've seen, it was
always defined for me, and since the method is supposed to be called
with a new state, I don't really see the point why it could be
optional.
这是一个 Option[T]
,因为如果您使用 StateSpec.timeout
设置超时,例如:
StateSpec.function(spec _).timeout(Milliseconds(5000))
那么函数超时后传入的值将是 None
并且 State[T]
上的 isTimingOut
方法将返回 true。这是有道理的,因为状态超时并不意味着指定键的新值已经到达,并且通常比为 T
传递 null
更安全(这不适用于无论如何基元)因为你希望用户安全地操作 Option[T]
.
您可以在 Sparks 实现中看到:
// Get the timed out state records, call the mapping function on each and collect the
// data returned
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) =>
wrappedState.wrapTimingOutState(state)
val returned = mappingFunction(batchTime, key, None, wrappedState) // <-- This.
mappedData ++= returned
newStateMap.remove(key)
}
}
What does the return value mean? I tried to find some pointers in the
documentations and source code, but none of them describe what it is
used for. Since I'm modifying the state of a key using state.remove()
and state.update(), why would I have to do the same with return
values?
return 值是一种沿着火花图传递中间状态的方法。例如,假设我想更新我的状态,但也在我的管道中使用 中间 数据执行一些操作,例如:
dStream
.mapWithState(stateSpec)
.map(optionIntermediateResult.map(_ * 2))
.foreachRDD( /* other stuff */)
那个 return 值正是让我能够继续对所述数据进行操作的原因。如果你不关心中间结果,只想要完整的状态,那么输出 None
完全没问题。
编辑:
我写了一篇 blog post(在这个问题之后)试图对 API.
进行深入的解释
我一直在使用 Spark Streaming 中的 mapWithState
API,但是 StateSpec.function
有两点不清楚:
假设我的函数是:
def trackStateForKey(batchTime: Time,
key: Long,
newValue: Option[JobData],
currentState: State[JobData]): Option[(Long, JobData)]
为什么新值是
Option[T]
类型?据我所知,它总是为我定义的,并且由于应该使用新状态调用该方法,所以我真的不明白为什么它可以是可选的。return 值是什么意思?我试图在文档和源代码中找到一些指示,但其中 none 描述了它的用途。由于我正在使用
state.remove()
和state.update()
修改键的状态,为什么我必须对 return 值做同样的事情?在我当前的实现中,我 return
None
如果我删除密钥,Some(newState)
如果我更新它,但我不确定这是否正确。
Why is the new value an
Option[T]
type? As far as I've seen, it was always defined for me, and since the method is supposed to be called with a new state, I don't really see the point why it could be optional.
这是一个 Option[T]
,因为如果您使用 StateSpec.timeout
设置超时,例如:
StateSpec.function(spec _).timeout(Milliseconds(5000))
那么函数超时后传入的值将是 None
并且 State[T]
上的 isTimingOut
方法将返回 true。这是有道理的,因为状态超时并不意味着指定键的新值已经到达,并且通常比为 T
传递 null
更安全(这不适用于无论如何基元)因为你希望用户安全地操作 Option[T]
.
您可以在 Sparks 实现中看到:
// Get the timed out state records, call the mapping function on each and collect the
// data returned
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) =>
wrappedState.wrapTimingOutState(state)
val returned = mappingFunction(batchTime, key, None, wrappedState) // <-- This.
mappedData ++= returned
newStateMap.remove(key)
}
}
What does the return value mean? I tried to find some pointers in the documentations and source code, but none of them describe what it is used for. Since I'm modifying the state of a key using state.remove() and state.update(), why would I have to do the same with return values?
return 值是一种沿着火花图传递中间状态的方法。例如,假设我想更新我的状态,但也在我的管道中使用 中间 数据执行一些操作,例如:
dStream
.mapWithState(stateSpec)
.map(optionIntermediateResult.map(_ * 2))
.foreachRDD( /* other stuff */)
那个 return 值正是让我能够继续对所述数据进行操作的原因。如果你不关心中间结果,只想要完整的状态,那么输出 None
完全没问题。
编辑:
我写了一篇 blog post(在这个问题之后)试图对 API.
进行深入的解释