Hbase周期性bulk load HFiles和minor compaction关系

Hbase bulk load HFiles periodically and minor compaction relation

我有这样的场景,我们必须定期将 HFiles 加载到 HBase table。

H 每个 运行 的文件大小可能在每个区域 50 到 150 MB 之间。这些负载可能是一天 12 次,在某些情况下可能是每 15 分钟一次。

在进行测试时,我观察到即使在区域中立即拥有超过 3 个文件后,也不会触发次要压缩。这可能会导致问题有很多文件保存相同行键的行。

我已经看到在 10000 秒(大约 2 小时 45 分钟)后唤醒的压缩线程正在开始压缩并将压缩任务放入队列中。

是否有任何配置可以告诉在批量加载 (completebulkload) 写入 3 个或更多 hFile 时立即触发次要压缩,而不管 HFile 的大小?

Hbase Version: HBase 1.1.2.2.6.5.4-1

Configuration:
   hbase.hstore.compaction.max = 10
   hbase.hstore.compactionThreshold = 3
   hbase.server.thread.wakefrequency = 10000

在查看 API 时,我发现可以在 hbase table 级别异步调用次要或主要压缩。

有 HBase Admin API 可用于根据需要调用压缩,如果批量加载更频繁地推送冗余数据并使用恒定区域拆分大小策略,则可以避免拆分。

这是在 Java 中执行此操作的示例代码:

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;

public class Compaction {

    private String tableName;
    private String compactionType;
    private Configuration conf;
    private Connection hbaseConnection;
    private String confFile = "/usr/hdp/current/hbase-client/conf/hbase-site.xml";
    private Admin admin;
    private Table table;
    private int sleepTime = 1 ;

    public Compaction(String tableName, String compactionType) {
        this.tableName = tableName;
        this.compactionType = compactionType;

    }

    private void initHBaseConnection() {
        this.conf = HBaseConfiguration.create();
        try {
            conf.addResource(new FileInputStream(new File(confFile )));
            hbaseConnection = ConnectionFactory.createConnection(conf);
            table = hbaseConnection.getTable(TableName.valueOf(tableName));
            admin = hbaseConnection.getAdmin();

        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public boolean perfom() throws InterruptedException {


          System.out.println("Performing action: Compact table " + tableName + ", compact type =" + compactionType);
          try {
            if (compactionType.equalsIgnoreCase("major")) {
              admin.majorCompact(table.getName());
            } else {
              admin.compact(table.getName());
            }
          } catch (Exception ex) {
            System.err.println("Compaction failed, might be caused by other chaos: " + ex.getMessage());
            return false;
          }
          if (sleepTime  > 0) {
            Thread.sleep(sleepTime);
          }
        return true;
    }
    public static void main(String[] args) throws InterruptedException {
        String tableName = args[0];
        String compactionType = args[1];
        Compaction compaction = new Compaction(tableName, compactionType);
        compaction.initHBaseConnection();
        compaction.perfom();
    }

}