查询 Spark 集群到 HBase 引发 "java.lang.IllegalStateException: unread block data" 异常

Query on Spark cluster to HBase raise "java.lang.IllegalStateException: unread block data" exception

我们的 Spark 设置在 3 个服务器上,所有服务器都可以看到 HBase 集群服务器。

我正在使用 Hadoop 2.7.3、HBase 1.2.6 和 Spark 2.1.3。

我用

连接到 Spark
/opt/spark/bin/spark-shell --master spark://master:7077

和运行以下

import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.{HBaseAdmin, Result, Put, HTable}
import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor, HColumnDescriptor }
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.TableDescriptor
import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.conf.Configuration
import scala.collection.JavaConverters._

val conf = HBaseConfiguration.create()
val tablename = "default:Table1"
conf.set(TableInputFormat.INPUT_TABLE,tablename)
val admin = new HBaseAdmin(conf)
admin.isTableAvailable(tablename)
val hBaseRDD = sc.
  newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
hBaseRDD.count()

当 运行 上 spark-shell, admin.isTableAvailable(tablename) returns true.

这导致认为 Spark 可以访问 HBase,但调用 hBaseRDD.count() 会引发以下错误:

java.lang.IllegalStateException: unread block data at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2776)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1600)
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2280)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2204)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2062)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
 at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
 at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1152)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
 at java.lang.Thread.run(Thread.java:748)
2018-07-17 15:58:54,974 ERROR [task-result-getter-3] scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.11.1.12 , executor 2): java.lang.IllegalStateException: unread block data at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2776)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1600)
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2280)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2204)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2062)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
 at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
 at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1152)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
 at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
 at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1455)
 at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1443)
 at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1442)
 at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1442)
 at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:802)
 at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:802)
 at scala.Option.foreach(Option.scala:257)
 at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
 at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1670)
 at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1625)
 at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1614)
 at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
 at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1928)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1941)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1968)
 at org.apache.spark.rdd.RDD.count(RDD.scala:1158)
 ... 52 elided
 Caused by: java.lang.IllegalStateException: unread block data
 at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2776)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1600)
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2280)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2204)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2062)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
 at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
 at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1152)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
 at java.lang.Thread.run(Thread.java:748)

spark-shell.

上执行脚本正常工作时提交到集群时出现此错误

根据一些研究,看起来可能存在版本不匹配或者由于我设置 spark.driver.extraClassPath 的方式,我设置为:

/opt/spark/jars/*:/opt/hbase/lib/commons-collections-3.2.2.jar:/opt/hbase/lib/commons-httpclient-3.1.jar:/opt/hbase/lib/findbugs-annotations-1.3.9-1.jar:/opt/hbase/lib/hbase-annotations-1.2.6.jar:/opt/hbase/lib/hbase-annotations-1.2.6-tests.jar:/opt/hbase/lib/hbase-client-1.2.6.jar:/opt/hbase/lib/hbase-common-1.2.6.jar:/opt/hbase/lib/hbase-common-1.2.6-tests.jar:/opt/hbase/lib/hbase-examples-1.2.6.jar:/opt/hbase/lib/hbase-external-blockcache-1.2.6.jar:/opt/hbase/lib/hbase-hadoop2-compat-1.2.6.jar:/opt/hbase/lib/hbase-hadoop-compat-1.2.6.jar:/opt/hbase/lib/hbase-it-1.2.6.jar:/opt/hbase/lib/hbase-it-1.2.6-tests.jar:/opt/hbase/lib/hbase-prefix-tree-1.2.6.jar:/opt/hbase/lib/hbase-procedure-1.2.6.jar:/opt/hbase/lib/hbase-protocol-1.2.6.jar:/opt/hbase/lib/hbase-resource-bundle-1.2.6.jar:/opt/hbase/lib/hbase-rest-1.2.6.jar:/opt/hbase/lib/hbase-server-1.2.6.jar:/opt/hbase/lib/hbase-server-1.2.6-tests.jar:/opt/hbase/lib/hbase-shell-1.2.6.jar:/opt/hbase/lib/hbase-thrift-1.2.6.jar:/opt/hbase/lib/jetty-util-6.1.26.jar:/opt/hbase/lib/ruby/hbase:/opt/hbase/lib/ruby/hbase/hbase.rb:/opt/hbase/lib/ruby/hbase.rb:/opt/hbase/lib/protobuf-java-2.5.0.jar:/opt/hbase/lib/metrics-core-2.2.0.jar:/opt/hbase/lib/htrace-core-3.1.0-incubating.jar:/opt/hbase/lib/guava-12.0.1.jar:/opt/hbase/lib/asm-3.1.jar:/opt/hbase/lib/Cdrpackage.jar:/opt/hbase/lib/commons-daemon-1.0.13.jar:/opt/hbase/lib/commons-el-1.0.jar:/opt/hbase/lib/commons-math-2.2.jar:/opt/hbase/lib/disruptor-3.3.0.jar:/opt/hbase/lib/jamon-runtime-2.4.1.jar:/opt/hbase/lib/jasper-compiler-5.5.23.jar:/opt/hbase/lib/jasper-runtime-5.5.23.jar:/opt/hbase/lib/jaxb-impl-2.2.3-1.jar:/opt/hbase/lib/jcodings-1.0.8.jar:/opt/hbase/lib/jersey-core-1.9.jar:/opt/hbase/lib/jersey-guice-1.9.jar:/opt/hbase/lib/jersey-json-1.9.jar:/opt/hbase/lib/jettison-1.3.3.jar:/opt/hbase/lib/jetty-sslengine-6.1.26.jar:/opt/hbase/lib/joni-2.1.2.jar:/opt/hbase/lib/jruby-complete-1.6.8.jar:/opt/hbase/lib/jsch-0.1.42.jar:/opt/hbase/lib/jsp-2.1-6.1.14.jar:/opt/hbase/lib/junit-4.12.jar:/opt/hbase/lib/servlet-api-2.5-6.1.14.jar:/opt/hbase/lib/servlet-api-2.5.jar:/opt/hbase/lib/spymemcached-2.11.6.jar:/opt/hive-hbase//opt/hive-hbase/hive-hbase-handler-2.0.1.jar

有什么解决办法吗?还是 Spark 问题或错误?

我找到了解决方案。

首先我尝试升级 javahadoophbasespark版本但是我遇到了同样的异常。

终于找到原因了。 在 spark/conf/spark-defauls.conf 中,我已经添加了 spark.driver.extraClassPath 参数,而 workers 需要 spark。 executor.extraClassPath 参数来查找那些 jar 文件。

因此,它们的先前版本可以正常工作。