纱线集群模式无法用spark读取Hbase数据

Unable to read Hbase data with spark in yarn cluster mode

集群配置:

我的做法:通过Spark读取HBase数据

当我使用 IntelliJ 和 local 模式时一切正常,但是当我将模式更改为 spark-submit --master yarn,发生以下堆栈跟踪:

20/05/20 11:00:46 ERROR mapreduce.TableInputFormat: java.io.IOException: java.lang.reflect.InvocationTargetException
    at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:221)
    at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:114)
    at org.apache.hadoop.hbase.mapreduce.TableInputFormat.initialize(TableInputFormat.java:200)
    at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:243)
    at org.apache.hadoop.hbase.mapreduce.TableInputFormat.getSplits(TableInputFormat.java:254)
    at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2146)
    at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
    at com.song.HbaseOnSpark1$.main(HbaseOnSpark1.scala:32)
    at com.song.HbaseOnSpark1.main(HbaseOnSpark1.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon.run(ApplicationMaster.scala:673)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:219)
    ... 27 more
Caused by: java.lang.NullPointerException
    at org.apache.hadoop.hbase.client.ConnectionImplementation.close(ConnectionImplementation.java:1938)
    at org.apache.hadoop.hbase.client.ConnectionImplementation.<init>(ConnectionImplementation.java:310)
    ... 32 more

20/05/20 11:00:46 ERROR yarn.ApplicationMaster: User class threw exception: java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details.
java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details.
    at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:254)
    at org.apache.hadoop.hbase.mapreduce.TableInputFormat.getSplits(TableInputFormat.java:254)
    at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2146)
    at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
    at com.song.HbaseOnSpark1$.main(HbaseOnSpark1.scala:32)
    at com.song.HbaseOnSpark1.main(HbaseOnSpark1.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon.run(ApplicationMaster.scala:673)
Caused by: java.lang.IllegalStateException: The input format instance has not been properly initialized. Ensure you call initializeTable either in your constructor or initialize method
    at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:558)
    at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:249)
    ... 24 more

这是我的代码:

 val conf: SparkConf = new SparkConf().setAppName("spark1")
    val spark = new SparkContext(conf)

    val hbaseConf: Configuration = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")
    hbaseConf.set(TableInputFormat.INPUT_TABLE,"idx_name")
    hbaseConf.set("hbase.defaults.for.version.skip", "true")


    val rdd: RDD[(ImmutableBytesWritable, Result)] = spark.newAPIHadoopRDD(
      hbaseConf,
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result]
    )

集群中的 hbase 类路径问题,但您需要像这样将 hbase jar 添加到类路径中

 export SPARK_CLASSPATH=$SPARK_CLASSPATH:`hbase classpath`

hbase classpath 将为 hbase 连接等提供所有 jar....

为什么它在本地模式下工作?

因为所有需要的罐子都在 ide lib


如果您使用的是 Maven,请执行 mvn depdency:tree 以了解集群中需要哪些 jar。在此基础上,您可以调整 spark-submit 脚本。

如果您正在使用 --jars 选项,请查看所有 jar 是否正确传递或 uber jar 在打包 jar 时具有正确的依赖关系..

可能存在 jar 冲突,请仔细检查本地模式环境,因为它工作正常。

进一步阅读