java.io.NotSerializableException 在启用检查点的 Spark Streaming 中
java.io.NotSerializableException in Spark Streaming with enabled checkpointing
代码如下:
def main(args: Array[String]) {
val sc = new SparkContext
val sec = Seconds(3)
val ssc = new StreamingContext(sc, sec)
ssc.checkpoint("./checkpoint")
val rdd = ssc.sparkContext.parallelize(Seq("a","b","c"))
val inputDStream = new ConstantInputDStream(ssc, rdd)
inputDStream.transform(rdd => {
val buf = ListBuffer[String]()
buf += "1"
buf += "2"
buf += "3"
val other_rdd = ssc.sparkContext.parallelize(buf) // create a new rdd
rdd.union(other_rdd)
}).print()
ssc.start()
ssc.awaitTermination()
}
并抛出异常:
java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
org.apache.spark.streaming.StreamingContext
Serialization stack:
- object not serializable (class: org.apache.spark.streaming.StreamingContext, value: org.apache.spark.streaming.StreamingContext@5626e185)
- field (class: com.mirrtalk.Test$$anonfun$main, name: ssc, type: class org.apache.spark.streaming.StreamingContext)
- object (class com.mirrtalk.Test$$anonfun$main, <function1>)
- field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$transform$$anonfun$apply, name: cleanedF, type: interface scala.Function1)
- object (class org.apache.spark.streaming.dstream.DStream$$anonfun$transform$$anonfun$apply, <function2>)
- field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$transform$$anonfun, name: cleanedF, type: interface scala.Function2)
- object (class org.apache.spark.streaming.dstream.DStream$$anonfun$transform$$anonfun, <function2>)
- field (class: org.apache.spark.streaming.dstream.TransformedDStream, name: transformFunc, type: interface scala.Function2)
当我删除代码 ssc.checkpoint("./checkpoint") 时,应用程序可以正常运行,但我需要启用检查点。
如何在启用检查点时解决此问题?
您可以将上下文初始化和配置任务移到外部 main
:
object App {
val sc = new SparkContext(new SparkConf().setAppName("foo").setMaster("local"))
val sec = Seconds(3)
val ssc = new StreamingContext(sc, sec)
ssc.checkpoint("./checkpoint") // enable checkpoint
def main(args: Array[String]) {
val rdd = ssc.sparkContext.parallelize(Seq("a", "b", "c"))
val inputDStream = new ConstantInputDStream(ssc, rdd)
inputDStream.transform(rdd => {
val buf = ListBuffer[String]()
buf += "1"
buf += "2"
buf += "3"
val other_rdd = ssc.sparkContext.parallelize(buf)
rdd.union(other_rdd) // I want to union other RDD
}).print()
ssc.start()
ssc.awaitTermination()
}
}
代码如下:
def main(args: Array[String]) {
val sc = new SparkContext
val sec = Seconds(3)
val ssc = new StreamingContext(sc, sec)
ssc.checkpoint("./checkpoint")
val rdd = ssc.sparkContext.parallelize(Seq("a","b","c"))
val inputDStream = new ConstantInputDStream(ssc, rdd)
inputDStream.transform(rdd => {
val buf = ListBuffer[String]()
buf += "1"
buf += "2"
buf += "3"
val other_rdd = ssc.sparkContext.parallelize(buf) // create a new rdd
rdd.union(other_rdd)
}).print()
ssc.start()
ssc.awaitTermination()
}
并抛出异常:
java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
org.apache.spark.streaming.StreamingContext
Serialization stack:
- object not serializable (class: org.apache.spark.streaming.StreamingContext, value: org.apache.spark.streaming.StreamingContext@5626e185)
- field (class: com.mirrtalk.Test$$anonfun$main, name: ssc, type: class org.apache.spark.streaming.StreamingContext)
- object (class com.mirrtalk.Test$$anonfun$main, <function1>)
- field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$transform$$anonfun$apply, name: cleanedF, type: interface scala.Function1)
- object (class org.apache.spark.streaming.dstream.DStream$$anonfun$transform$$anonfun$apply, <function2>)
- field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$transform$$anonfun, name: cleanedF, type: interface scala.Function2)
- object (class org.apache.spark.streaming.dstream.DStream$$anonfun$transform$$anonfun, <function2>)
- field (class: org.apache.spark.streaming.dstream.TransformedDStream, name: transformFunc, type: interface scala.Function2)
当我删除代码 ssc.checkpoint("./checkpoint") 时,应用程序可以正常运行,但我需要启用检查点。
如何在启用检查点时解决此问题?
您可以将上下文初始化和配置任务移到外部 main
:
object App {
val sc = new SparkContext(new SparkConf().setAppName("foo").setMaster("local"))
val sec = Seconds(3)
val ssc = new StreamingContext(sc, sec)
ssc.checkpoint("./checkpoint") // enable checkpoint
def main(args: Array[String]) {
val rdd = ssc.sparkContext.parallelize(Seq("a", "b", "c"))
val inputDStream = new ConstantInputDStream(ssc, rdd)
inputDStream.transform(rdd => {
val buf = ListBuffer[String]()
buf += "1"
buf += "2"
buf += "3"
val other_rdd = ssc.sparkContext.parallelize(buf)
rdd.union(other_rdd) // I want to union other RDD
}).print()
ssc.start()
ssc.awaitTermination()
}
}