使用 Structured Spark Streaming 在 HBase 中批量插入数据

Bulk Insert Data in HBase using Structured Spark Streaming

我正在使用 Structured Spark Streaming 读取来自 Kafka 的数据(每秒 100.000 行),我正在尝试将所有数据插入 HBase。

我使用的是 Cloudera Hadoop 2.6,我使用的是 Spark 2.3

我尝试了一些我见过的东西 here

eventhubs.writeStream
 .foreach(new MyHBaseWriter[Row])
 .option("checkpointLocation", checkpointDir)
 .start()
 .awaitTermination()

MyHBaseWriter 看起来像这样:

class AtomeHBaseWriter[RECORD] extends HBaseForeachWriter[Row] {
  override def toPut(record: Row): Put = {
    override val tableName: String = "hbase-table-name"

    override def toPut(record: Row): Put = {
        // Get Json
        val data = JSON.parseFull(record.getString(0)).asInstanceOf[Some[Map[String, Object]]]
        val key = data.getOrElse(Map())("key")+ ""
        val val = data.getOrElse(Map())("val")+ ""

        val p = new Put(Bytes.toBytes(key))
        //Add columns ... 
        p.addColumn(Bytes.toBytes(columnFamaliyName),Bytes.toBytes(columnName), Bytes.toBytes(val))

        p
     }
    }

HBaseForeachWriter class 看起来像这样:

trait HBaseForeachWriter[RECORD] extends ForeachWriter[RECORD] {
  val tableName: String

  def pool: Option[ExecutorService] = None

  def user: Option[User] = None

  private var hTable: Table = _
  private var connection: Connection = _


  override def open(partitionId: Long, version: Long): Boolean = {
    connection = createConnection()
    hTable = getHTable(connection)
    true
  }

  def createConnection(): Connection = {
    // I create HBase Connection Here
  }

  def getHTable(connection: Connection): Table = {
    connection.getTable(TableName.valueOf(Variables.getTableName()))
  }

  override def process(record: RECORD): Unit = {
    val put = toPut(record)
    hTable.put(put)
  }

  override def close(errorOrNull: Throwable): Unit = {
    hTable.close()
    connection.close()
  }

  def toPut(record: RECORD): Put
}

所以我在这里逐行放置,即使我允许每个执行器有 20 个和 4 个内核,我也没有立即将数据插入 HBase。所以我需要做的是批量加载,但我很挣扎,因为我在互联网上找到的所有内容都是用 RDD 和 Map/Reduce.

实现的

据我所知,hbase 中的记录摄取速度很慢。我有几个建议给你。

1) hbase.client.write.buffer .
以下属性可能对你有帮助。

hbase.client.write.buffer

Description Default size of the BufferedMutator write buffer in bytes. A bigger buffer takes more memory — on both the client and server side since server instantiates the passed write buffer to process it — but a larger buffer size reduces the number of RPCs made. For an estimate of server-side memory-used, evaluate hbase.client.write.buffer * hbase.regionserver.handler.count

Default 2097152 (around 2 mb )

我更喜欢 foreachBatch see spark docs(它是 spark 核心中的一种 foreachPartition)而不是 foreach

也在你的 hbase writer extends ForeachWriter

open方法初始化put数组列表 在 process 中将 put 添加到 puts 的数组列表中 在 close table.put(listofputs); 中,然后在更新 table...

后重置数组列表

它的作用基本上是您上面提到的缓冲区大小填充了 2 MB,然后它将刷新到 hbase table。到那时记录不会进入 hbase table.

您可以将其增加到 10mb 等等.... 这样 RPC 的数量将减少。大量数据将被刷新并存储在 hbase table.

当写入缓冲区已满并触发 flushCommits 到 hbase table 时。

示例代码:在我的

2) switch off WAL 你可以关掉WAL(write ahead log - 危险是无法恢复)但是会加速写...如果不想恢复数据。

Note : if you are using solr or cloudera search on hbase tables you should not turn it off since Solr will work on WAL. if you switch it off then, Solr indexing wont work.. this is one common mistake many of us does.

如何关闭: https://hbase.apache.org/1.1/apidocs/org/apache/hadoop/hbase/client/Put.html#setWriteToWAL(boolean)

正如我提到的,puts 列表是个好方法...这是在结构化流式示例如下所示之前执行的旧方法(foreachPartition 和 puts 列表).. 其中 foreachPartition 对每个分区进行操作不是每一行。

def writeHbase(mydataframe: DataFrame) = {
      val columnFamilyName: String = "c"
      mydataframe.foreachPartition(rows => {
        val puts = new util.ArrayList[ Put ]
        rows.foreach(row => {
          val key = row.getAs[ String ]("rowKey")
          val p = new Put(Bytes.toBytes(key))
          val columnV = row.getAs[ Double ]("x")
          val columnT = row.getAs[ Long ]("y")
          p.addColumn(
            Bytes.toBytes(columnFamilyName),
            Bytes.toBytes("x"),
            Bytes.toBytes(columnX)
          )
          p.addColumn(
            Bytes.toBytes(columnFamilyName),
            Bytes.toBytes("y"),
            Bytes.toBytes(columnY)
          )
          puts.add(p)
        })
        HBaseUtil.putRows(hbaseZookeeperQuorum, hbaseTableName, puts)
      })
    }

To sumup :

What I feel is we need to understand the psycology of spark and hbase to make then effective pair.