HBase 批量删除为 "complete bulk load"

HBase bulk delete as "complete bulk load"

我想删除 HBase 中的 3 亿行 table。我可以使用 HBase API 并发送一批 Delete 对象。但恐怕要花很多时间。

以前的代码就是这种情况,我想插入数百万行。我没有使用 HBase API 并发送一批 Puts,而是使用了一个 Map Reduce 作业,它发出 RowKey / Put 作为值并使用 HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator) 来设置我的 Reducer,以便它直接写入准备好的输出由 LoadIncrementalHFiles 快速加载(完全批量加载)。它要快得多(5 分钟而不是 3 小时)。

所以我想对批量删除做同样的事情。

但是,我似乎无法将此技术与 Delete 一起使用,因为 HFileOutputFormat2 尝试为 KeyValuePut (PutSortReducer) 配置 Reducer,但 Delete 不存在。

我的第一个问题是为什么没有 "DeleteSortReducer" 来为 Delete 启用完整的批量加载技术?是否只是遗漏了什么,还没有完成?还是有更深层次的理由来证明这一点?

第二个问题,有点相关:如果我 copy/paste PutSortReducer 的代码,将其改编为 Delete 并将其作为我工作的 Reducer 传递,它会起作用吗? HBase 完全批量加载是否会生成充满墓碑的 HFile?

示例:

public class DeleteSortReducer extends
        Reducer<ImmutableBytesWritable, Delete, ImmutableBytesWritable, KeyValue> {

    @Override
    protected void reduce(
            ImmutableBytesWritable row,
            java.lang.Iterable<Delete> deletes,
            Reducer<ImmutableBytesWritable, Delete,
                    ImmutableBytesWritable, KeyValue>.Context context)
            throws java.io.IOException, InterruptedException
    {
        // although reduce() is called per-row, handle pathological case
        long threshold = context.getConfiguration().getLong(
                "putsortreducer.row.threshold", 1L * (1<<30));
        Iterator<Delete> iter = deletes.iterator();
        while (iter.hasNext()) {
            TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
            long curSize = 0;
            // stop at the end or the RAM threshold
            while (iter.hasNext() && curSize < threshold) {
                Delete d = iter.next();
                for (List<Cell> cells: d.getFamilyCellMap().values()) {
                    for (Cell cell: cells) {
                        KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
                        map.add(kv);
                        curSize += kv.heapSize();
                    }
                }
            }
            context.setStatus("Read " + map.size() + " entries of " + map.getClass()
                    + "(" + StringUtils.humanReadableInt(curSize) + ")");
            int index = 0;
            for (KeyValue kv : map) {
                context.write(row, kv);
                if (++index % 100 == 0)
                    context.setStatus("Wrote " + index);
            }

            // if we have more entries to process
            if (iter.hasNext()) {
                // force flush because we cannot guarantee intra-row sorted order
                context.write(null, null);
            }
        }
    }
}

首先,简单介绍一下删除操作在 HBase 中的工作原理。在删除命令中,HBase 将数据标记为已删除并将有关它的信息写入 HFile。实际上,数据并没有从光盘中删除,存储中存在两条记录:数据和删除标记。只有在压缩之后,数据才会从磁盘存储中删除。

所有这些信息表示为 KeyValue. For KeyValue for data representing has KeyValue.Type 等于 Put。对于删除标记 KeyValue.Type,设置以下值之一 DeleteDeleteColumnDeleteFamilyDeleteFamilyVersion

在您的情况下,您可以通过为 KeyValue.Type 创建具有特殊值的 KeyValue 来实现批量删除。例如,如果你想删除唯一的一列,你应该创建一个KeyValue,使用构造函数

KeyValue(byte[] row, byte[] family, byte[] qualifier, long timestamp, KeyValue.Type type)

// example 

KeyValue kv = new KeyValue(row, family, qualifier, time, KeyValue.Type.DeleteColumn)

第一个问题的答案不需要特别DeleteSortReducer,你应该为KeyValue配置一个reducer。对于第二个问题,答案是否定的。