为什么 DStream.foreachRDD 失败并显示 java.io.NotSerializableException: org.apache.spark.SparkContext?

Why does DStream.foreachRDD fail with java.io.NotSerializableException: org.apache.spark.SparkContext?

我需要根据来自 Kafka 的处理数据使用 GraphX 构建一个图。但是,似乎 sc.parallelize() 引发了错误 java.io.NotSerializableException: org.apache.spark.SparkContext

......
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  ssc, kafkaParams, topicsSet)
val lines = messages.map(_._2)

lines.foreachRDD(rdd => {
  rdd.foreachPartition(partition => {
    ......
    // Build a graph
    val vertRDD = sc.parallelize(vertices)
    val edgeRDD = sc.parallelize(edge)
    val graph = Graph(vertRDD, edgeRDD, defaultUser)
    }
  })
})

我应该通过什么方式解决问题?

foreachRDD Spark Streaming 中的运算符 运行s 在驱动程序上的每个批处理间隔处理 RDD,然后您使用该驱动程序(通过它的 RDD)编写最终将自身变成的代码激发工作。

foreachRDD(foreachFunc: (RDD[T]) ⇒ Unit): Unit Apply a function to each RDD in this DStream. This is an output operator, so 'this' DStream will be registered as an output stream and therefore materialized.

RDD.foreachPartition 是一个只会发生在执行者身上的动作。

foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit Applies a function f to each partition of this RDD.

在任务可用于执行器上执行之前,它必须被序列化,因为 SparkContext 不可序列化,因此异常。这就是 Spark 确保 SparkContext(如 sc)永远不会因 Spark 中的设计决策而出现的方式。无论如何,这没有任何意义,因为整个调度基础设施都在驱动程序上。

SparkContextRDD 仅在驱动程序上可用。它们只是描述您的分布式计算的一种方式,最终将 "translated" 用于 Spark 执行器上 运行 的任务。

这就是您看到错误消息的原因:

java.io.NotSerializableException: org.apache.spark.SparkContext

顺便说一句,我今天回答了一个类似的问题(参见 )所以今天看起来像是 SparkContext 日 :)