为什么 hbase KeyValueSortReducer 需要对所有 KeyValue 进行排序

why hbase KeyValueSortReducer need to sort all KeyValue

最近在学习Phoenix CSV Bulk Load,发现org.apache.phoenix.mapreduce.CsvToKeyValueReducer的源码会在一行很大的时候OOM(java heap out of memory)(我的例如,一行 44 列,一行的平均大小为 4KB)。

此外,这个 class 与 hbase bulk load reducer class - KeyValueSortReducer 类似。这意味着在我的情况下使用 KeyValueSortReducer 时可能会发生 OOM。

所以,我有一个问题 KeyValueSortReducer - 为什么它需要先对 treeset 中的所有 kvs 进行排序,然后将它们全部写入上下文?如果我删除树集排序代码并将所有 kvs 直接写入上下文,结果将不同或错误?

期待您的回复。祝你好运!

这里是KeyValueSortReducer的源代码:

public class KeyValueSortReducer extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
  protected void reduce(ImmutableBytesWritable row, java.lang.Iterable<KeyValue> kvs,
      org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
  throws java.io.IOException, InterruptedException {
    TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
    for (KeyValue kv: kvs) {
      try {
        map.add(kv.clone());
      } catch (CloneNotSupportedException e) {
        throw new java.io.IOException(e);
      }
    }
    context.setStatus("Read " + map.getClass());
    int index = 0;
    for (KeyValue kv: map) {
      context.write(row, kv);
      if (++index % 100 == 0) context.setStatus("Wrote " + index);
    }
  }
}

请查看this case study。有一些要求,您需要将键值对排序到 HFile 的同一行中。

1.The 主要问题:为什么 hbase KeyValueSortReducer 需要对所有 KeyValue 进行排序?

感谢 RamPrasad G 的回复,我们可以查看案例研究:http://www.deerwalk.com/blog/bulk-importing-data/

这个案例研究将告诉我们更多关于 hbase 批量导入和 reducer class - KeyValueSortReducer。 之所以在KeyValueSortReducer reduce方法中对所有KeyValue进行排序,是因为HFile需要这样排序。您可以关注部分:

A frequently occurring problem while reducing is lexical ordering. It happens when keyvalue list to be outputted from reducer is not sorted. One example is when qualifier names for a single row are not written in lexically increasing order. Another being when multiple rows are written in same reduce method and row id’s are not written in lexically increasing order. It happens because reducer output is never sorted. All sorting occurs on keyvalue outputted by mapper and before it enters reduce method. So, it tries to add keyvalue’s outputted from reduce method in incremental fashion assuming that it is presorted. So, before keyvalue’s are written into context, they must be added into sorting list like TreeSet or HashSet with KeyValue.COMPARATOR as comparator and then writing them in order specified by sorted list.

所以,当你的列很大时,排序会占用大量内存。 作为 KeyValueSortReducer 的源代码 memtioned:

/**
 * Emits sorted KeyValues.
 * Reads in all KeyValues from passed Iterator, sorts them, then emits
 * KeyValues in sorted order.  If lots of columns per row, it will use lots of
 * memory sorting.
 * @see HFileOutputFormat
 */

2.The 参考问题:为什么 Phoenix CSV BulkLoad reducer casue OOM ?

Phoenix CSV BulkLoad reducer casue OOM 的原因是参考PHOENIX-2649的问题。 由于 Comparator inside CsvTableRowKeyPair 比较两个 CsvTableRowKeyPair 并使所有行在一次 reduce 调用中通过一个单一的 reducer, 就我而言,它会很快导致 OOM。

幸运的是,Phoenix Team已经在4.7版本修复了这个问题。如果你的phoenix版本低于4.7,请注意并尝试更新你的版本, 或者你可以为你的版本打个补丁。

希望这个回答对您有所帮助!