星火流 HiveContext NullPointerException
Spark Streaming HiveContext NullPointerException
我正在 CDH 5.8.3 集群上使用 Spark 1.6.0 编写一个 Spark Streaming 应用程序。该应用程序非常简单:它从 Kafka 读取数据,对 DStream/RDDs 进行一些转换,然后将它们输出到 Hive table。我也尝试使用 sqlContext 放置一些愚蠢的示例代码,但错误仍然存在。
我的问题是我无法在 DStream 的 foreachRDD 语句中使用 HiveContext。
我的代码如下所示:
val sc = new SparkContext()
val sqlContext = new HiveContext(sc)
val ssc = new StreamingContext(sc, Minutes(sparkBatchInterval))
ssc.checkpoint(CHECKPOINT_DIR)
ssc.sparkContext.setLogLevel("WARN")
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokersList, "auto.offset.reset" -> "smallest")
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(kafkaTopic))
val validatedAndPersisted = dstream.transform( rdd => {...}).persist(StorageLevel.MEMORY_AND_DISK_SER)
val recordsToBeIngested = ...
recordsToBeIngested.foreachRDD(rdd=> {
rdd.persist(StorageLevel.MEMORY_AND_DISK)
val ingestCount = rdd.count
if(ingestCount>0) {
sqlContext.tables("sc4").show() //here actually I shoud have a insertInto
}
}
我得到的错误是这个:
Exception in thread "main" java.lang.NullPointerException
at org.apache.spark.sql.hive.client.ClientWrapper.conf(ClientWrapper.scala:205)
at org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:554)
at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:553)
at org.apache.spark.sql.hive.HiveContext$$anonfun$configure.apply(HiveContext.scala:540)
at org.apache.spark.sql.hive.HiveContext$$anonfun$configure.apply(HiveContext.scala:539)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.hive.HiveContext.configure(HiveContext.scala:539)
at org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:252)
at org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:239)
at org.apache.spark.sql.hive.HiveContext$$anon.<init>(HiveContext.scala:459)
at org.apache.spark.sql.hive.HiveContext.catalog$lzycompute(HiveContext.scala:459)
at org.apache.spark.sql.hive.HiveContext.catalog(HiveContext.scala:458)
at org.apache.spark.sql.hive.HiveContext$$anon.<init>(HiveContext.scala:475)
at org.apache.spark.sql.hive.HiveContext.analyzer$lzycompute(HiveContext.scala:475)
at org.apache.spark.sql.hive.HiveContext.analyzer(HiveContext.scala:474)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
at org.apache.spark.sql.SQLContext.tables(SQLContext.scala:855)
at myPackage.Ingestion$$anonfun$createStreamingContext.apply(Ingestion.scala:173)
at myPackage.Ingestion$$anonfun$createStreamingContext.apply(Ingestion.scala:166)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$$anonfun$apply$mcV$sp.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$$anonfun$apply$mcV$sp.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$$anonfun$apply$mcV$sp.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$$anonfun$apply$mcV$sp.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$$anonfun$apply$mcV$sp.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
您是否知道导致此错误的原因或我该如何解决?
谢谢,
马可
我自己找到了答案。问题是由于我在 StreamingContext 之前创建了 HiveContext。在 StreamingContext 创建后移动创建解决了问题。
我正在 CDH 5.8.3 集群上使用 Spark 1.6.0 编写一个 Spark Streaming 应用程序。该应用程序非常简单:它从 Kafka 读取数据,对 DStream/RDDs 进行一些转换,然后将它们输出到 Hive table。我也尝试使用 sqlContext 放置一些愚蠢的示例代码,但错误仍然存在。
我的问题是我无法在 DStream 的 foreachRDD 语句中使用 HiveContext。
我的代码如下所示:
val sc = new SparkContext()
val sqlContext = new HiveContext(sc)
val ssc = new StreamingContext(sc, Minutes(sparkBatchInterval))
ssc.checkpoint(CHECKPOINT_DIR)
ssc.sparkContext.setLogLevel("WARN")
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokersList, "auto.offset.reset" -> "smallest")
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(kafkaTopic))
val validatedAndPersisted = dstream.transform( rdd => {...}).persist(StorageLevel.MEMORY_AND_DISK_SER)
val recordsToBeIngested = ...
recordsToBeIngested.foreachRDD(rdd=> {
rdd.persist(StorageLevel.MEMORY_AND_DISK)
val ingestCount = rdd.count
if(ingestCount>0) {
sqlContext.tables("sc4").show() //here actually I shoud have a insertInto
}
}
我得到的错误是这个:
Exception in thread "main" java.lang.NullPointerException
at org.apache.spark.sql.hive.client.ClientWrapper.conf(ClientWrapper.scala:205)
at org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:554)
at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:553)
at org.apache.spark.sql.hive.HiveContext$$anonfun$configure.apply(HiveContext.scala:540)
at org.apache.spark.sql.hive.HiveContext$$anonfun$configure.apply(HiveContext.scala:539)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.hive.HiveContext.configure(HiveContext.scala:539)
at org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:252)
at org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:239)
at org.apache.spark.sql.hive.HiveContext$$anon.<init>(HiveContext.scala:459)
at org.apache.spark.sql.hive.HiveContext.catalog$lzycompute(HiveContext.scala:459)
at org.apache.spark.sql.hive.HiveContext.catalog(HiveContext.scala:458)
at org.apache.spark.sql.hive.HiveContext$$anon.<init>(HiveContext.scala:475)
at org.apache.spark.sql.hive.HiveContext.analyzer$lzycompute(HiveContext.scala:475)
at org.apache.spark.sql.hive.HiveContext.analyzer(HiveContext.scala:474)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
at org.apache.spark.sql.SQLContext.tables(SQLContext.scala:855)
at myPackage.Ingestion$$anonfun$createStreamingContext.apply(Ingestion.scala:173)
at myPackage.Ingestion$$anonfun$createStreamingContext.apply(Ingestion.scala:166)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$$anonfun$apply$mcV$sp.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$$anonfun$apply$mcV$sp.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$$anonfun$apply$mcV$sp.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$$anonfun$apply$mcV$sp.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$$anonfun$apply$mcV$sp.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
您是否知道导致此错误的原因或我该如何解决?
谢谢, 马可
我自己找到了答案。问题是由于我在 StreamingContext 之前创建了 HiveContext。在 StreamingContext 创建后移动创建解决了问题。