如何扫描和删除 HBase 中的数百万行

How to scanning and deleting millions of rows in HBase

发生了什么
由于系统错误,上个月的所有数据都已损坏。所以我们必须手动删除并重新输入这些记录。基本上,我想删除在某个时间段内插入的所有行。但是,我发现很难在 HBase 中扫描和删除数百万行。

可能的解决方案
我找到了两种批量删除的方法:
第一个是设置一个TTL,这样系统会自动删除所有过时的记录。但是我想保留上个月之前插入的记录,所以这个解决方案对我不起作用。

第二个选项是使用 Java API:

编写客户端
 public static void deleteTimeRange(String tableName, Long minTime, Long maxTime) {
    Table table = null;
    Connection connection = null;

    try {
        Scan scan = new Scan();
        scan.setTimeRange(minTime, maxTime);
        connection = HBaseOperator.getHbaseConnection();
        table = connection.getTable(TableName.valueOf(tableName));
        ResultScanner rs = table.getScanner(scan);

        List<Delete> list = getDeleteList(rs);
        if (list.size() > 0) {

            table.delete(list);
        }
    } catch (Exception e) {
        e.printStackTrace();

    } finally {
        if (null != table) {
            try {
                table.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        if (connection != null) {
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

private static List<Delete> getDeleteList(ResultScanner rs) {

    List<Delete> list = new ArrayList<>();
    try {

        for (Result r : rs) {
            Delete d = new Delete(r.getRow());
            list.add(d);
        }
    } finally {
        rs.close();
    }
    return list;
}

但在这种方法中,所有记录都存储在 ResultScanner rs 中,因此堆大小会很大。而且如果程序卡死了,还得从头开始。
那么,有没有更好的方法来达到目的呢?

不知道您在 table 中处理了多少 'millions',但最简单的方法是不要尝试一次将它们全部放入 List,而是使用 .next(n) 函数以更易于管理的步骤进行操作。像这样:

for (Result row : rs.next(numRows))
{
Delete del = new Delete(row.getRow());
...
}

这样,您可以通过 numRows 参数控制通过单个 RPC 从服务器返回的行数。确保它足够大,以免与服务器进行太多次往返,但同时又不要太大而无法杀死您的堆。您还可以使用 BufferedMutator 一次对多个 Delete 进行操作。

希望对您有所帮助。

我建议两个改进:

  1. 使用 BufferedMutator 批量删除,它完全满足您的需求 – 保留突变的内部缓冲区并在缓冲区填满时将其刷新到 HBase,因此您不必担心保留自己的列表, 调整大小并冲洗它。
  2. 改进扫描:
    • 使用 KeyOnlyFilter – 由于您不需要这些值,因此无需检索它们
    • 使用scan.setCacheBlocks(false) - 由于您进行了全面扫描table,因此在区域服务器上缓存所有块没有多大意义
    • 调整 scan.setCaching(N)scan.setBatch(N) – N 将取决于您的密钥的大小,您应该在缓存更多和所需的内存之间保持平衡;但由于您只传输密钥,我想 N 可能会很大。

这是您的代码的更新版本:

public static void deleteTimeRange(String tableName, Long minTime, Long maxTime) {
    try (Connection connection = HBaseOperator.getHbaseConnection();
         final Table table = connection.getTable(TableName.valueOf(tableName));
         final BufferedMutator mutator = connection.getBufferedMutator(TableName.valueOf(tableName))) {

        Scan scan = new Scan();
        scan.setTimeRange(minTime, maxTime);
        scan.setFilter(new KeyOnlyFilter());
        scan.setCaching(1000);
        scan.setBatch(1000);
        scan.setCacheBlocks(false);
        try (ResultScanner rs = table.getScanner(scan)) {
            for (Result result : rs) {
                mutator.mutate(new Delete(result.getRow()));
            }
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

注意 "try with resource" 的使用——如果您省略它,请确保 .close() mutatorrstableconnection.