Hortonworks Spark Hbase 连接器 (SHC) - 将 Dataframe 写入 Hbase 时出现空指针异常

Hortonworks Spark Hbase Connector(SHC) - Null pointer exception while writing Dataframe to Hbase

这是使用 Spark-Hbase 连接器将 spark Dataframe 写入 Hbase 的简单代码。我在 df.write 操作时遇到 NullPointerException,并停止将 DF 写入 Hbase。但是,我可以使用 Spark-Hbase 连接器从 Hbase 读取数据。以下链接中讨论了此问题,但建议的解决方案没有帮助。

https://github.com/hortonworks-spark/shc/issues/278

https://github.com/hortonworks-spark/shc/issues/46

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog

object SparkHbaseHW {

  case class Employee(key: String, fName: String, lName: String,
                      mName: String, addressLine: String, city: String,
                      state: String, zipCode: String)

  def main(args: Array[String]): Unit = {

    def catalog =
      s"""{
         |"table":{"namespace":"default", "name":"employee"},
         |"rowkey":"key",
         |"columns":{
         |"key":{"cf":"rowkey", "col":"key", "type":"string"},
         |"fName":{"cf":"person", "col":"firstName", "type":"string"},
         |"lName":{"cf":"person", "col":"lastName", "type":"string"},
         |"mName":{"cf":"person", "col":"middleName", "type":"string"},
         |"addressLine":{"cf":"address", "col":"addressLine", "type":"string"},
         |"city":{"cf":"address", "col":"city", "type":"string"},
         |"state":{"cf":"address", "col":"state", "type":"string"},
         |"zipCode":{"cf":"address", "col":"zipCode", "type":"string"}
         |}
         |}""".stripMargin


    val data = Seq(Employee("1", "Abby", "Smith", "K", "3456 main", "Orlando", "FL", "45235"),
      Employee("2", "Amaya", "Williams", "L", "123 Orange", "Newark", "NJ", "27656"),
      Employee("3", "Alchemy", "Davis", "P", "Warners", "Sanjose", "CA", "34789"))


    val spark: SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkHbaseWrite")
      .getOrCreate()


    import spark.implicits._
    val df = spark.sparkContext.parallelize(data).toDF


    df.write.options(
      Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "4"))
      .format("org.apache.spark.sql.execution.datasources.hbase")
      .save()
  }
}

异常:

Exception in thread "main" java.lang.NullPointerException
    at org.apache.hadoop.hbase.security.UserProvider.instantiate(UserProvider.java:124)
    at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:202)
    at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:114)
    at org.apache.hadoop.hbase.mapreduce.TableOutputFormat.checkOutputSpecs(TableOutputFormat.java:179)
    at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.assertConf(SparkHadoopWriter.scala:391)
    at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset.apply$mcV$sp(PairRDDFunctions.scala:1083)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset.apply(PairRDDFunctions.scala:1081)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset.apply(PairRDDFunctions.scala:1081)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1081)
    at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.insert(HBaseRelation.scala:230)
    at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:61)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand.apply(DataFrameWriter.scala:656)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand.apply(DataFrameWriter.scala:656)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
    at com.test.HBaseSparkHW$.main(HbaseSparkHW.scala:55)
    at com.test.HBaseSparkHW.main(HbaseSparkHW.scala)
20/05/18 16:59:50 INFO SparkContext: Invoking stop() from shutdown hook```

正如在 中所讨论的,我对 SparkSession 构建器进行了额外的配置更改,异常消失了。但是,我不清楚原因和解决方法。希望有人能解释一下。

val spark: SparkSession = SparkSession.builder()
  .master("local[1]")
  .appName("HbaseSparkWrite")
  .config("spark.hadoop.validateOutputSpecs", false)
  .getOrCreate()