HBase 并发/并行扫描来自 Spark 1.6,Scala 2.10.6 除了多线程

HBase Concurrent / Parallel Scan from Spark 1.6, Scala 2.10.6 besides multithreading

我有一个行前缀列表Array("a", "b", ...)

我需要为每个 rowPrefix 查询 HBase(使用 Nerdammer)。我目前的解决方案是

case class Data(x: String)

val rowPrefixes = Array("a", "b", "c")

rowPrefixes.par
    .map( rowPrefix => {
          val rdd = sc.hbaseTable[Data]("tableName")
            .inColumnFamily("columnFamily")
            .withStartRow(rowPrefix)

          rdd
        })
    .reduce(_ union _)

我基本上是使用多线程 (.par) 加载多个 rdd,然后最后将它们全部合并。有一个更好的方法吗?除了 nerdammer,我不介意使用其他库。

此外,我担心 reflection API threadsafe issue 因为我正在将 hbase 读入 case class.

的 RDD

我没有使用过 Nerdammer 连接器,但如果我们考虑您的 4 个前缀行键过滤器的示例,使用 par 并行度将受到限制,集群可能未得到充分利用并且结果可能很慢。

您可以检查是否可以使用 Nerdammer 连接器实现以下功能,我使用了 hbase-spark 连接器 (CDH),在下面的方法中,将扫描所有 table 分区的行键前缀,即所有 table 区域并行分布在集群中,可以更有效地利用可用资源 (cores/RAM),更重要的是可以利用分布式计算的力量。

val hbaseConf = HBaseConfiguration.create()
// set zookeeper quorum properties in hbaseConf

val hbaseContext = new HBaseContext(sc, hbaseConf)

val rowPrefixes = Array("a", "b", "c")
val filterList = new FilterList()

rowPrefixes.foreach { x => filterList.addFilter(new PrefixFilter(Bytes.toBytes(x))) }

var scan = new Scan()  

scan.setFilter(filterList)
scan.addFamily(Bytes.toBytes("myCF"));

val rdd = hbaseContext.hbaseRDD(TableName.valueOf("tableName"), scan)
rdd.mapPartitions(populateCaseClass)

在您的情况下,table 扫描会发生太满,但只有 4 个分区会完成大量工作,假设您有足够的核心可用并且 par 可以为 rowPrefixes 数组中的每个元素分配一个核心。

希望这对您有所帮助。