"java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext"执行spark streaming时
"java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext" When execute spark streaming
当我在 yarn 上执行 spark streaming 应用程序时,我继续收到以下错误
为什么会出现错误,如何解决?任何建议都会有所帮助,谢谢~
15/05/07 11:11:50 INFO dstream.StateDStream: Marking RDD 2364 for time 1430968310000 ms for checkpointing
15/05/07 11:11:50 INFO scheduler.JobScheduler: Added jobs for time 1430968310000 ms
15/05/07 11:11:50 INFO scheduler.JobGenerator: Checkpointing graph for time 1430968310000 ms
15/05/07 11:11:50 INFO streaming.DStreamGraph: Updating checkpoint data for time 1430968310000 ms
15/05/07 11:11:50 INFO streaming.DStreamGraph: Updated checkpoint data for time 1430968310000 ms
15/05/07 11:11:50 ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContext
java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
spark streaming应用程序代码如下,我在spark-shell
中执行
import kafka.cluster.Cluster
import kafka.serializer.StringDecoder
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext._
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
Some(0)
}
val ssc = new StreamingContext(sc,
new Duration(5000))
ssc.checkpoint(".")
val lines = KafkaUtils.createStream(ssc, "10.1.10.21:2181", "kafka_spark_streaming", Map("hello_test" -> 3))
val uuidDstream = lines.transform(rdd => rdd.map(_._2)).map(x => (x, 1)).updateStateByKey[Int](updateFunc)
uuidDstream.count().print()
ssc.start()
ssc.awaitTermination()
在 updateStateByKey
的闭包中使用的对 val updateFunc
的引用是将该实例的其余部分拉入闭包中并带走 StreamingContext。
两个选项:
- 快速修复:声明流上下文瞬态 =>
@transient val ssc= ...
将 dstream 声明也注释为 @transient
也是一个好主意。
- 更好的解决方法:将函数放在单独的对象中
像这样:
case object TransformFunctions {
val updateFunc = ???
}
当我在 yarn 上执行 spark streaming 应用程序时,我继续收到以下错误
为什么会出现错误,如何解决?任何建议都会有所帮助,谢谢~
15/05/07 11:11:50 INFO dstream.StateDStream: Marking RDD 2364 for time 1430968310000 ms for checkpointing
15/05/07 11:11:50 INFO scheduler.JobScheduler: Added jobs for time 1430968310000 ms
15/05/07 11:11:50 INFO scheduler.JobGenerator: Checkpointing graph for time 1430968310000 ms
15/05/07 11:11:50 INFO streaming.DStreamGraph: Updating checkpoint data for time 1430968310000 ms
15/05/07 11:11:50 INFO streaming.DStreamGraph: Updated checkpoint data for time 1430968310000 ms
15/05/07 11:11:50 ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContext
java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
spark streaming应用程序代码如下,我在spark-shell
中执行 import kafka.cluster.Cluster
import kafka.serializer.StringDecoder
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext._
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
Some(0)
}
val ssc = new StreamingContext(sc,
new Duration(5000))
ssc.checkpoint(".")
val lines = KafkaUtils.createStream(ssc, "10.1.10.21:2181", "kafka_spark_streaming", Map("hello_test" -> 3))
val uuidDstream = lines.transform(rdd => rdd.map(_._2)).map(x => (x, 1)).updateStateByKey[Int](updateFunc)
uuidDstream.count().print()
ssc.start()
ssc.awaitTermination()
在 updateStateByKey
的闭包中使用的对 val updateFunc
的引用是将该实例的其余部分拉入闭包中并带走 StreamingContext。
两个选项:
- 快速修复:声明流上下文瞬态 =>
@transient val ssc= ...
将 dstream 声明也注释为@transient
也是一个好主意。 - 更好的解决方法:将函数放在单独的对象中
像这样:
case object TransformFunctions {
val updateFunc = ???
}