Spark Streaming 有状态转换 mapWithState 函数出现错误 java.util.NoSuchElementException:None.get
Spark Streaming stateful transformation mapWithState function getting error java.util.NoSuchElementException: None.get
我想用 mapWithState 函数 (Spark 1.6) 替换我的 updateStateByKey 函数以提高我的程序的性能。
我正在关注这两个文档:
https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-spark-streaming.html
https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Streaming%20mapWithState.html
但我收到错误 scala.MatchError:[Ljava.lang.Object]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 71.0 failed 4 times, most recent failure: Lost task 0.3 in stage 71.0 (TID 88, ttsv-lab-vmdb-01.englab.juniper.net): scala.MatchError: [Ljava.lang.Object;@eaf8bc8 (of class [Ljava.lang.Object;)
at HbaseCovrageStream$$anonfun$HbaseCovrageStream$$tracketStateFunc.apply(HbaseCoverageStream_mapwithstate.scala:84)
at HbaseCovrageStream$$anonfun$HbaseCovrageStream$$tracketStateFunc.apply(HbaseCoverageStream_mapwithstate.scala:84)
at scala.Option.flatMap(Option.scala:170)
at HbaseCovrageStream$.HbaseCovrageStream$$tracketStateFunc(HbaseCoverageStream_mapwithstate.scala:84)
参考代码:
def trackStateFunc(key:String, value:Option[Array[Long]], current:State[Seq[Array[Long]]]):Option[Array[Long]] = {
/*adding current state to the previous state*/
val res = value.map(x => x +: current.getOption().get).orElse(current.getOption())
current.update(res.get)
res.flatMap {
case as: Seq[Array[Long]] => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption //throws match error
}
}
val statespec:StateSpec[String, Array[Long], Array[Long], Option[Array[Long]]] = StateSpec.function(trackStateFunc _)
val state: MapWithStateDStream[String, Array[Long], Array[Long], Option[Array[Long]]] = parsedStream.mapWithState(statespec)
我以前使用 updateStateByKey 函数的工作代码:
val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
(current: Seq[Array[Long]], prev: Option[Array[Long]]) => {
prev.map(_ +: current).orElse(Some(current))
.flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
})
您的问题可能是值不存在的情况:您将状态包装在 Some 中,然后您应该匹配它。或者您可以使用 state.getOption(再次检查您附加的 link 中的示例)
谢谢伊戈尔。我更改了我的 trackStateFunc,它现在可以工作了。
参考我使用 mapWithState 的工作代码:
def trackStateFunc(batchTime: Time, key: String, value: Option[Array[Long]], state: State[Array[Long]])
: Option[(String, Array[Long])] = {
// Check if state exists
if (state.exists) {
val newState:Array[Long] = Array(state.get, value.get).transpose.map(_.sum)
state.update(newState) // Set the new state
Some((key, newState))
} else {
val initialState = value.get
state.update(initialState) // Set the initial state
Some((key, initialState))
}
}
// StateSpec[KeyType, ValueType, StateType, MappedType]
val stateSpec: StateSpec[String, Array[Long], Array[Long], (String, Array[Long])] = StateSpec.function(trackStateFunc _)
val state: MapWithStateDStream[String, Array[Long], Array[Long], (String, Array[Long])] = parsedStream.mapWithState(stateSpec)
我想用 mapWithState 函数 (Spark 1.6) 替换我的 updateStateByKey 函数以提高我的程序的性能。
我正在关注这两个文档: https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-spark-streaming.html
https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Streaming%20mapWithState.html
但我收到错误 scala.MatchError:[Ljava.lang.Object]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 71.0 failed 4 times, most recent failure: Lost task 0.3 in stage 71.0 (TID 88, ttsv-lab-vmdb-01.englab.juniper.net): scala.MatchError: [Ljava.lang.Object;@eaf8bc8 (of class [Ljava.lang.Object;)
at HbaseCovrageStream$$anonfun$HbaseCovrageStream$$tracketStateFunc.apply(HbaseCoverageStream_mapwithstate.scala:84)
at HbaseCovrageStream$$anonfun$HbaseCovrageStream$$tracketStateFunc.apply(HbaseCoverageStream_mapwithstate.scala:84)
at scala.Option.flatMap(Option.scala:170)
at HbaseCovrageStream$.HbaseCovrageStream$$tracketStateFunc(HbaseCoverageStream_mapwithstate.scala:84)
参考代码:
def trackStateFunc(key:String, value:Option[Array[Long]], current:State[Seq[Array[Long]]]):Option[Array[Long]] = {
/*adding current state to the previous state*/
val res = value.map(x => x +: current.getOption().get).orElse(current.getOption())
current.update(res.get)
res.flatMap {
case as: Seq[Array[Long]] => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption //throws match error
}
}
val statespec:StateSpec[String, Array[Long], Array[Long], Option[Array[Long]]] = StateSpec.function(trackStateFunc _)
val state: MapWithStateDStream[String, Array[Long], Array[Long], Option[Array[Long]]] = parsedStream.mapWithState(statespec)
我以前使用 updateStateByKey 函数的工作代码:
val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
(current: Seq[Array[Long]], prev: Option[Array[Long]]) => {
prev.map(_ +: current).orElse(Some(current))
.flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
})
您的问题可能是值不存在的情况:您将状态包装在 Some 中,然后您应该匹配它。或者您可以使用 state.getOption(再次检查您附加的 link 中的示例)
谢谢伊戈尔。我更改了我的 trackStateFunc,它现在可以工作了。
参考我使用 mapWithState 的工作代码:
def trackStateFunc(batchTime: Time, key: String, value: Option[Array[Long]], state: State[Array[Long]])
: Option[(String, Array[Long])] = {
// Check if state exists
if (state.exists) {
val newState:Array[Long] = Array(state.get, value.get).transpose.map(_.sum)
state.update(newState) // Set the new state
Some((key, newState))
} else {
val initialState = value.get
state.update(initialState) // Set the initial state
Some((key, initialState))
}
}
// StateSpec[KeyType, ValueType, StateType, MappedType]
val stateSpec: StateSpec[String, Array[Long], Array[Long], (String, Array[Long])] = StateSpec.function(trackStateFunc _)
val state: MapWithStateDStream[String, Array[Long], Array[Long], (String, Array[Long])] = parsedStream.mapWithState(stateSpec)