sparkSession.sql 抛出 NullPointerException
sparkSession.sql throwing NullPointerException
我有两个 scala classes 作为我的 spark-sql 工作的一部分,即 Driver.scala 和 ExtractorOne.scala.
Driver.scala 将不同的参数(如 sparkSession 对象等)传递给不同的提取器(如 ExtractorOne.scala 等)
在 Extractor classes 中,我从 oracle 中提取数据并在 hdfs 位置写入镶木地板文件。
作为业务逻辑的一部分,我必须调用 sparkSession.sql() 来执行一些操作。但是在 Extractor/calling class 的 extract() 方法中,sparkSession 导致了 Nullpointer 异常......所以我试图通过调用 sparkSession.sql("show tables" 在调用函数中检查它).show() 它给出了结果,即对象没有问题。当调用相同的时候,即 sparkSession.sql("show tables").show() 在被调用函数中它抛出 Nullpointer 异常......知道我在这里做错了什么吗?
'
Driver.scala
val spark = ConfigUtils.getSparkSession( ...); //spark session initialization successful
val parquetDf = spark.read.format("parquet"); // able to read parquet file data and got the dataframe.
val extractors : LinkedHashMap[String, (DataFrameReader, SparkSession, String, String,String,String) => Unit] = Utils.getAllDefinedExtractors();
///ExtractorOne.scala ExtractorTwo.scala ..etc are extractors as shown in other scala file
for ( key:String <- extractors.keys){
extractors.get(key).map{
spark.sql("show tables").show() ///output
fun => fun(ora_df_options_conf,spark,keyspace,key.trim(),"","")
}
}
'
spark.sql("show tables").show() 的输出:::
spark.sql("show tables").show()
> Blockquote
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+
但同样在 ExtractorOne.scala
中给出了错误
'
ExtractorOne.scala
def extract(oraOptionDfConfig: DataFrameReader, sparkSession: SparkSession, keyspace: String,
columnFamilyName: String, fromDate:String , toDate:String ) : Unit ={
val company_df = ..// some opeartion to read the data from oracle to company_df
val dist_df = company_df.distinct("id")
company_df.createOrReplaceTempView("company")
dist_df.foreach( row =>{
if(row.anyNull){
}else{
val sqlQuery:String = s" select * from company where id='%s' and quarter='%s' and year='%s' ".format( row.get(0) , row.get(1) , row.get(2))
sparkSession.sql("show tables").show() ///output...
var partitionDf = sparkSession.sql(sqlQuery)
partitionDf.show(1)
writeAsParquet(...) ///save as parquet file/s
}
}
'
sparkSession.sql("show tables").show() 的输出:::
错误:
'
原因:java.lang.NullPointerException
在 org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:142)
在 org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:140)
在 org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
在 com.snp.extractors.CompanyModelValsExtractor$$anonfun$extract$1.apply(ExtractorOne.scala:126)
在 com.snp.extractors.CompanyModelValsExtractor$$anonfun$extract$1.apply(ExtractorOne.scala:113)
在 scala.collection.Iterator$class.foreach(Iterator.scala:891)
在 scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
'
您不能在执行程序端代码中使用 SparkSession(即在 dist_df.foreach
循环中),在这种情况下 Spark Session 为 null(它仅存在于驱动程序中)
我有两个 scala classes 作为我的 spark-sql 工作的一部分,即 Driver.scala 和 ExtractorOne.scala.
Driver.scala 将不同的参数(如 sparkSession 对象等)传递给不同的提取器(如 ExtractorOne.scala 等)
在 Extractor classes 中,我从 oracle 中提取数据并在 hdfs 位置写入镶木地板文件。
作为业务逻辑的一部分,我必须调用 sparkSession.sql() 来执行一些操作。但是在 Extractor/calling class 的 extract() 方法中,sparkSession 导致了 Nullpointer 异常......所以我试图通过调用 sparkSession.sql("show tables" 在调用函数中检查它).show() 它给出了结果,即对象没有问题。当调用相同的时候,即 sparkSession.sql("show tables").show() 在被调用函数中它抛出 Nullpointer 异常......知道我在这里做错了什么吗?
'
Driver.scala
val spark = ConfigUtils.getSparkSession( ...); //spark session initialization successful
val parquetDf = spark.read.format("parquet"); // able to read parquet file data and got the dataframe.
val extractors : LinkedHashMap[String, (DataFrameReader, SparkSession, String, String,String,String) => Unit] = Utils.getAllDefinedExtractors();
///ExtractorOne.scala ExtractorTwo.scala ..etc are extractors as shown in other scala file
for ( key:String <- extractors.keys){
extractors.get(key).map{
spark.sql("show tables").show() ///output
fun => fun(ora_df_options_conf,spark,keyspace,key.trim(),"","")
}
}
'
spark.sql("show tables").show() 的输出:::
spark.sql("show tables").show()
> Blockquote
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+
但同样在 ExtractorOne.scala
中给出了错误'
ExtractorOne.scala
def extract(oraOptionDfConfig: DataFrameReader, sparkSession: SparkSession, keyspace: String,
columnFamilyName: String, fromDate:String , toDate:String ) : Unit ={
val company_df = ..// some opeartion to read the data from oracle to company_df
val dist_df = company_df.distinct("id")
company_df.createOrReplaceTempView("company")
dist_df.foreach( row =>{
if(row.anyNull){
}else{
val sqlQuery:String = s" select * from company where id='%s' and quarter='%s' and year='%s' ".format( row.get(0) , row.get(1) , row.get(2))
sparkSession.sql("show tables").show() ///output...
var partitionDf = sparkSession.sql(sqlQuery)
partitionDf.show(1)
writeAsParquet(...) ///save as parquet file/s
}
}
'
sparkSession.sql("show tables").show() 的输出:::
错误:
'
原因:java.lang.NullPointerException
在 org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:142)
在 org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:140)
在 org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
在 com.snp.extractors.CompanyModelValsExtractor$$anonfun$extract$1.apply(ExtractorOne.scala:126)
在 com.snp.extractors.CompanyModelValsExtractor$$anonfun$extract$1.apply(ExtractorOne.scala:113)
在 scala.collection.Iterator$class.foreach(Iterator.scala:891)
在 scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
'
您不能在执行程序端代码中使用 SparkSession(即在 dist_df.foreach
循环中),在这种情况下 Spark Session 为 null(它仅存在于驱动程序中)