hBaseRDD.collect() 报错

hBaseRDD.collect() giving an error

我正在使用 spark 和 hbase。我用了 HBaseTest.scala.

rdd.count() 给出了准确的结果。但是当我尝试执行 rdd.collect() 时出现以下错误:

java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:210)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

无法找出问题所在。我想打印一些行的 hbase table.

我遇到了同样的问题并找到了解决方案 here

这是一个代码片段

type HBaseRow = java.util.NavigableMap[Array[Byte], java.util.NavigableMap[Array[Byte], java.util.NavigableMap[java.lang.Long, Array[Byte]]]]
type CFTimeseriesRow = Map[Array[Byte], Map[Array[Byte], Map[Long, Array[Byte]]]]
type CFTimeseriesRowStr = scala.collection.immutable.Map[String, scala.collection.immutable.Map[String, scala.collection.immutable.Map[Long, String]]]
import scala.collection.JavaConverters._
def rowToStrMap(navMap: CFTimeseriesRow): CFTimeseriesRowStr = navMap.map(cf =>
  (Bytes.toString(cf._1), cf._2.map(col =>
    (Bytes.toString(col._1), col._2.map(elem => (elem._1, Bytes.toString(elem._2)))))))
def navMapToMap(navMap: HBaseRow): CFTimeseriesRow =
  navMap.asScala.toMap.map(cf =>
    (cf._1, cf._2.asScala.toMap.map(col =>
      (col._1, col._2.asScala.toMap.map(elem => (elem._1.toLong, elem._2))))))
@transient val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, tableName)
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  classOf[org.apache.hadoop.hbase.client.Result])
  .map(kv => (kv._1.get(), navMapToMap(kv._2.getMap)))
  .map(kv => (Bytes.toString(kv._1), rowToStrMap(kv._2))).take(10).foreach(println)