如何扫描和删除 HBase 中的数百万行
How to scanning and deleting millions of rows in HBase
发生了什么
由于系统错误,上个月的所有数据都已损坏。所以我们必须手动删除并重新输入这些记录。基本上,我想删除在某个时间段内插入的所有行。但是,我发现很难在 HBase 中扫描和删除数百万行。
可能的解决方案
我找到了两种批量删除的方法:
第一个是设置一个TTL,这样系统会自动删除所有过时的记录。但是我想保留上个月之前插入的记录,所以这个解决方案对我不起作用。
第二个选项是使用 Java API:
编写客户端
public static void deleteTimeRange(String tableName, Long minTime, Long maxTime) {
Table table = null;
Connection connection = null;
try {
Scan scan = new Scan();
scan.setTimeRange(minTime, maxTime);
connection = HBaseOperator.getHbaseConnection();
table = connection.getTable(TableName.valueOf(tableName));
ResultScanner rs = table.getScanner(scan);
List<Delete> list = getDeleteList(rs);
if (list.size() > 0) {
table.delete(list);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != table) {
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
private static List<Delete> getDeleteList(ResultScanner rs) {
List<Delete> list = new ArrayList<>();
try {
for (Result r : rs) {
Delete d = new Delete(r.getRow());
list.add(d);
}
} finally {
rs.close();
}
return list;
}
但在这种方法中,所有记录都存储在 ResultScanner rs
中,因此堆大小会很大。而且如果程序卡死了,还得从头开始。
那么,有没有更好的方法来达到目的呢?
不知道您在 table 中处理了多少 'millions',但最简单的方法是不要尝试一次将它们全部放入 List
,而是使用 .next(n)
函数以更易于管理的步骤进行操作。像这样:
for (Result row : rs.next(numRows))
{
Delete del = new Delete(row.getRow());
...
}
这样,您可以通过 numRows
参数控制通过单个 RPC
从服务器返回的行数。确保它足够大,以免与服务器进行太多次往返,但同时又不要太大而无法杀死您的堆。您还可以使用 BufferedMutator
一次对多个 Delete
进行操作。
希望对您有所帮助。
我建议两个改进:
- 使用
BufferedMutator
批量删除,它完全满足您的需求 – 保留突变的内部缓冲区并在缓冲区填满时将其刷新到 HBase,因此您不必担心保留自己的列表, 调整大小并冲洗它。
- 改进扫描:
- 使用
KeyOnlyFilter
– 由于您不需要这些值,因此无需检索它们
- 使用
scan.setCacheBlocks(false)
- 由于您进行了全面扫描table,因此在区域服务器上缓存所有块没有多大意义
- 调整
scan.setCaching(N)
和 scan.setBatch(N)
– N 将取决于您的密钥的大小,您应该在缓存更多和所需的内存之间保持平衡;但由于您只传输密钥,我想 N
可能会很大。
这是您的代码的更新版本:
public static void deleteTimeRange(String tableName, Long minTime, Long maxTime) {
try (Connection connection = HBaseOperator.getHbaseConnection();
final Table table = connection.getTable(TableName.valueOf(tableName));
final BufferedMutator mutator = connection.getBufferedMutator(TableName.valueOf(tableName))) {
Scan scan = new Scan();
scan.setTimeRange(minTime, maxTime);
scan.setFilter(new KeyOnlyFilter());
scan.setCaching(1000);
scan.setBatch(1000);
scan.setCacheBlocks(false);
try (ResultScanner rs = table.getScanner(scan)) {
for (Result result : rs) {
mutator.mutate(new Delete(result.getRow()));
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
注意 "try with resource" 的使用——如果您省略它,请确保 .close()
mutator
、rs
、table
和 connection
.
发生了什么
由于系统错误,上个月的所有数据都已损坏。所以我们必须手动删除并重新输入这些记录。基本上,我想删除在某个时间段内插入的所有行。但是,我发现很难在 HBase 中扫描和删除数百万行。
可能的解决方案
我找到了两种批量删除的方法:
第一个是设置一个TTL,这样系统会自动删除所有过时的记录。但是我想保留上个月之前插入的记录,所以这个解决方案对我不起作用。
第二个选项是使用 Java API:
编写客户端 public static void deleteTimeRange(String tableName, Long minTime, Long maxTime) {
Table table = null;
Connection connection = null;
try {
Scan scan = new Scan();
scan.setTimeRange(minTime, maxTime);
connection = HBaseOperator.getHbaseConnection();
table = connection.getTable(TableName.valueOf(tableName));
ResultScanner rs = table.getScanner(scan);
List<Delete> list = getDeleteList(rs);
if (list.size() > 0) {
table.delete(list);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != table) {
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
private static List<Delete> getDeleteList(ResultScanner rs) {
List<Delete> list = new ArrayList<>();
try {
for (Result r : rs) {
Delete d = new Delete(r.getRow());
list.add(d);
}
} finally {
rs.close();
}
return list;
}
但在这种方法中,所有记录都存储在 ResultScanner rs
中,因此堆大小会很大。而且如果程序卡死了,还得从头开始。
那么,有没有更好的方法来达到目的呢?
不知道您在 table 中处理了多少 'millions',但最简单的方法是不要尝试一次将它们全部放入 List
,而是使用 .next(n)
函数以更易于管理的步骤进行操作。像这样:
for (Result row : rs.next(numRows))
{
Delete del = new Delete(row.getRow());
...
}
这样,您可以通过 numRows
参数控制通过单个 RPC
从服务器返回的行数。确保它足够大,以免与服务器进行太多次往返,但同时又不要太大而无法杀死您的堆。您还可以使用 BufferedMutator
一次对多个 Delete
进行操作。
希望对您有所帮助。
我建议两个改进:
- 使用
BufferedMutator
批量删除,它完全满足您的需求 – 保留突变的内部缓冲区并在缓冲区填满时将其刷新到 HBase,因此您不必担心保留自己的列表, 调整大小并冲洗它。 - 改进扫描:
- 使用
KeyOnlyFilter
– 由于您不需要这些值,因此无需检索它们 - 使用
scan.setCacheBlocks(false)
- 由于您进行了全面扫描table,因此在区域服务器上缓存所有块没有多大意义 - 调整
scan.setCaching(N)
和scan.setBatch(N)
– N 将取决于您的密钥的大小,您应该在缓存更多和所需的内存之间保持平衡;但由于您只传输密钥,我想N
可能会很大。
- 使用
这是您的代码的更新版本:
public static void deleteTimeRange(String tableName, Long minTime, Long maxTime) {
try (Connection connection = HBaseOperator.getHbaseConnection();
final Table table = connection.getTable(TableName.valueOf(tableName));
final BufferedMutator mutator = connection.getBufferedMutator(TableName.valueOf(tableName))) {
Scan scan = new Scan();
scan.setTimeRange(minTime, maxTime);
scan.setFilter(new KeyOnlyFilter());
scan.setCaching(1000);
scan.setBatch(1000);
scan.setCacheBlocks(false);
try (ResultScanner rs = table.getScanner(scan)) {
for (Result result : rs) {
mutator.mutate(new Delete(result.getRow()));
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
注意 "try with resource" 的使用——如果您省略它,请确保 .close()
mutator
、rs
、table
和 connection
.