Spark Streaming 丢失 SparkContext
Spark Streaming losing SparkContext
我有一些非典型问题。当我尝试处理从 kafka 接收到的 rdd 时,当我尝试访问 sparkContext 时出现异常(java.lang.NullPointerException)。 RDDProcessor 是可序列化的
def convertArrayToDataFrame(byteArray: Array[Byte], sqlContext: SQLContext) = {
val stringFromByte = b2s(byteArray)
val rdd = sqlContext.sparkContext.parallelize(stringFromByte.split("\n"))
val rows = rdd.map(_.split(";")).map(attributes => Row.fromSeq(attributes.toSeq))
val dateframe = sqlContext.createDataFrame(rows,RDDComponents.schema)
dateframe
}
问题是这样开始的:
val receiver = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER)
val sqlContext = new SQLContext(ssc.sparkContext)
receiver.foreachRDD { rdd =>
log.info("Received RDD attempt")
if (!rdd.isEmpty()) {
rdd.foreach(a => rddProcessor.processingRDD(a, sqlContext))
}
但是当我只处理第一个rdd时,问题就不会发生
val receiver = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER)
val sqlContext = new SQLContext(ssc.sparkContext)
receiver.foreachRDD { rdd =>
log.info("Received RDD attempt")
if (!rdd.isEmpty()) {
rddProcessor.processingRDD(rdd.first(), sqlContext)
}
真不知道为什么问题这么大。如果有人有提示,我将不胜感激
@编辑
我定义 StreamingContext
val sparkConf = new SparkConf().setAppName("KafkaConsumer")
val ssc = new StreamingContext(sparkConf, Milliseconds(batchDuration))
好吧,SparkContext
不可序列化,它可以在 SqlContext
中通过 SparkSession
获得,其中标记为 @transient
。所以如果你不能以这样的方式写 processingRDD
它永远不会使用 SparkContext
,你就不能在需要序列化的 lambda 中使用它,比如 foreach
的或 map
的论点(但不是 foreachRDD
的论点!)。
我有一些非典型问题。当我尝试处理从 kafka 接收到的 rdd 时,当我尝试访问 sparkContext 时出现异常(java.lang.NullPointerException)。 RDDProcessor 是可序列化的
def convertArrayToDataFrame(byteArray: Array[Byte], sqlContext: SQLContext) = {
val stringFromByte = b2s(byteArray)
val rdd = sqlContext.sparkContext.parallelize(stringFromByte.split("\n"))
val rows = rdd.map(_.split(";")).map(attributes => Row.fromSeq(attributes.toSeq))
val dateframe = sqlContext.createDataFrame(rows,RDDComponents.schema)
dateframe
}
问题是这样开始的:
val receiver = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER)
val sqlContext = new SQLContext(ssc.sparkContext)
receiver.foreachRDD { rdd =>
log.info("Received RDD attempt")
if (!rdd.isEmpty()) {
rdd.foreach(a => rddProcessor.processingRDD(a, sqlContext))
}
但是当我只处理第一个rdd时,问题就不会发生
val receiver = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER)
val sqlContext = new SQLContext(ssc.sparkContext)
receiver.foreachRDD { rdd =>
log.info("Received RDD attempt")
if (!rdd.isEmpty()) {
rddProcessor.processingRDD(rdd.first(), sqlContext)
}
真不知道为什么问题这么大。如果有人有提示,我将不胜感激
@编辑 我定义 StreamingContext
val sparkConf = new SparkConf().setAppName("KafkaConsumer")
val ssc = new StreamingContext(sparkConf, Milliseconds(batchDuration))
好吧,SparkContext
不可序列化,它可以在 SqlContext
中通过 SparkSession
获得,其中标记为 @transient
。所以如果你不能以这样的方式写 processingRDD
它永远不会使用 SparkContext
,你就不能在需要序列化的 lambda 中使用它,比如 foreach
的或 map
的论点(但不是 foreachRDD
的论点!)。