从 HBase 中提取数据的最快方法是什么
what is the Fastest way to extract data from HBase
我有大约 5TB 的数据分布在 HBase 的 30 个不同的 table 中。
我的用例是,基于每个 table 中的两个特定列,即 YEAR 和 Country,我必须创建 5K 个不同的文本文件。
我为此目的集成了 HIVE 和 HBase,但从 HIVE 中提取需要很长时间。
我必须在 10 小时内完成这件事。
寻求您的想法如何实现。
我对此有一些疑问。
- HIVE HBase 集成是好的方法吗?
- 使用 mapreduce 从 HBase 中提取数据是个好主意吗?
- 我不能使用 Apache Phoenix,因为它没有与 HBase 一起发布。
- IMPALA 也使用高内存,所以我的集群没有为此配置。
public int run(String[] args) throws Exception {
int result = 0;
if (hbaseConf == null)
hbaseConf = getHbaseConfiguration();
Job job = new Job(hbaseConf);
job.setJarByClass(HBaseToFileDriver.class);
job.setJobName("Importing Data from HBase to File:::" + args[0]);
Scan scan = new Scan();
scan.setCaching(5000); // 1 is the default in Scan, which will be bad
// for
// MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
scan.addFamily(Bytes.toBytes("cf"));
TableMapReduceUtil.initTableMapperJob(args[0], scan, MyMapper.class, null, null, job);
// No reducers. Just write straight to output files.
job.setNumReduceTasks(0);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Result.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
return result;
}
}
我在 HBase 中的数据就像
���U"9����|Japan|2012 48433172245 1001371402 FundamentalSeries NULL NULL 139 238474518 1.65494205533344 Price2SFCFLPsr NULL False 3011645 1000190205 False True I Japan 2012
C��t�I�\���7|ThirdPartyPrivate|2009 48934711562 1001371402 FundamentalSeries NULL NULL 9 5631268 21.2315827835749 STCA_PoP NULL False 3011645 1000193170 False True I ThirdPartyPrivate 2009
�����^Z4Ga�|Japan|2013 48433158708 1001371402 FundamentalSeries NULL NULL 507 160531379 1.1248E10 STAX_TTM 500186 False 3011646 1000193168 False False I Japan 2013
G\�=�HO�S�|Japan|2008 48433173983 1001371402 FundamentalSeries NULL NULL 153 1961706488 0.500256556630127 RIBEIT_TTM NULL False 3011646 1000193016 False False I Japan 2008
�G��G�i0�]|Japan|2012 48433336633 1001371402 FundamentalSeries NULL NULL 894 3112047463 14.3904580667924 Ev2SEBIT_Avg5 NULL False 3011645 1000190030 False True I Japan 2012
���r����/8|Japan|2015 48433251137 1001371402 FundamentalSeries NULL NULL 200 2907364871 -46.9431625157866 SNOPA_YoY NULL False 3011646 1000423629 False False I Japan 2015
�)H�<�����t|Japan|2008 48433139729 1001371402 FundamentalSeries NULL NULL 1170 2604636883 0.267980759053007 PPE2ANOA NULL False 3011646 1001262486 False False I Japan 2008
'H�&�g���|Japan|2005 48433195827 1001371402 FundamentalSeries NULL NULL 147 450289107 0.540110660915134 Ev2SEBIT NULL False 3011645 1000190028 False True I Japan 2005
c�\��17ɟ�|Japan|2013 48433160145 1001371402 FundamentalSeries NULL NULL 885 2010667500 -19.6553084635268 SAMI_TTM_YoY NULL False 3011646 1000190297 False False I Japan 2013
j���}��||Japan|2010 48433159175 1001371402 FundamentalSeries NULL NULL 214 420693538 -17.3468681844827 SCOR_YoY NULL False 3011646 1000192789 False False I Japan 2010
选项 1:请注意,hive hbase 集成和查询 hive 也将在后台使用 mapreduce...
但是您无法对 hive 执行的 mapreduce 进行细粒度控制。
选项3:你也排除了你提到的选项3即凤凰
选项 4:Impala 速度更快,但您有某些限制。所以排除了
选项 2:根据我使用 hbase 的经验,我会提供使用 mapreduce 从 HBase 中提取数据。即您的选项 2 将对作业的执行进行更精细的控制。
但在这种方法中,您还必须微调您的工作。
scan.setCaching(500);
scan.setCacheBlocks(false);
- 最重要的是你必须设计你的 rowkey 以避免 hot spotting and use efficient filters (like
FuzzyRowFilter
for instance see here),以确保快速访问。
- 尽量避免使用列值过滤器,以确保不会发生完整 table 扫描。
- 请注意,table 的区域数量等于为该特定作业启动的映射器数量。因此,将 table 预先拆分到某个范围(例如 0-9)之间,以便您的所有行都属于这些确定的区域(当然它可以进一步拆分为多个区域,但这是一种方式如果确保更少的区域数量,因此所有映射器都会获得足够数量的记录来处理...)
如果我没看错的话。你想生成多个序列文件;
请查看使用 MultipleOutputs 的使用模式。
see Usage pattern for job submission:
Job job = new Job();
FileInputFormat.setInputPath(job, inDir);
FileOutputFormat.setOutputPath(job, outDir);
job.setMapperClass(MOMap.class);
job.setReducerClass(MOReduce.class);
...
// Defines additional single text based output 'text' for the job
MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
LongWritable.class, Text.class);
// Defines additional sequence-file based output 'sequence' for the job
MultipleOutputs.addNamedOutput(job, "seq",
SequenceFileOutputFormat.class,
LongWritable.class, Text.class);
...
job.waitForCompletion(true);
...
我有大约 5TB 的数据分布在 HBase 的 30 个不同的 table 中。 我的用例是,基于每个 table 中的两个特定列,即 YEAR 和 Country,我必须创建 5K 个不同的文本文件。 我为此目的集成了 HIVE 和 HBase,但从 HIVE 中提取需要很长时间。 我必须在 10 小时内完成这件事。 寻求您的想法如何实现。 我对此有一些疑问。
- HIVE HBase 集成是好的方法吗?
- 使用 mapreduce 从 HBase 中提取数据是个好主意吗?
- 我不能使用 Apache Phoenix,因为它没有与 HBase 一起发布。
- IMPALA 也使用高内存,所以我的集群没有为此配置。
public int run(String[] args) throws Exception {
int result = 0;
if (hbaseConf == null)
hbaseConf = getHbaseConfiguration();
Job job = new Job(hbaseConf);
job.setJarByClass(HBaseToFileDriver.class);
job.setJobName("Importing Data from HBase to File:::" + args[0]);
Scan scan = new Scan();
scan.setCaching(5000); // 1 is the default in Scan, which will be bad
// for
// MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
scan.addFamily(Bytes.toBytes("cf"));
TableMapReduceUtil.initTableMapperJob(args[0], scan, MyMapper.class, null, null, job);
// No reducers. Just write straight to output files.
job.setNumReduceTasks(0);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Result.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
return result;
}
}
我在 HBase 中的数据就像
���U"9����|Japan|2012 48433172245 1001371402 FundamentalSeries NULL NULL 139 238474518 1.65494205533344 Price2SFCFLPsr NULL False 3011645 1000190205 False True I Japan 2012
C��t�I�\���7|ThirdPartyPrivate|2009 48934711562 1001371402 FundamentalSeries NULL NULL 9 5631268 21.2315827835749 STCA_PoP NULL False 3011645 1000193170 False True I ThirdPartyPrivate 2009
�����^Z4Ga�|Japan|2013 48433158708 1001371402 FundamentalSeries NULL NULL 507 160531379 1.1248E10 STAX_TTM 500186 False 3011646 1000193168 False False I Japan 2013
G\�=�HO�S�|Japan|2008 48433173983 1001371402 FundamentalSeries NULL NULL 153 1961706488 0.500256556630127 RIBEIT_TTM NULL False 3011646 1000193016 False False I Japan 2008
�G��G�i0�]|Japan|2012 48433336633 1001371402 FundamentalSeries NULL NULL 894 3112047463 14.3904580667924 Ev2SEBIT_Avg5 NULL False 3011645 1000190030 False True I Japan 2012
���r����/8|Japan|2015 48433251137 1001371402 FundamentalSeries NULL NULL 200 2907364871 -46.9431625157866 SNOPA_YoY NULL False 3011646 1000423629 False False I Japan 2015
�)H�<�����t|Japan|2008 48433139729 1001371402 FundamentalSeries NULL NULL 1170 2604636883 0.267980759053007 PPE2ANOA NULL False 3011646 1001262486 False False I Japan 2008
'H�&�g���|Japan|2005 48433195827 1001371402 FundamentalSeries NULL NULL 147 450289107 0.540110660915134 Ev2SEBIT NULL False 3011645 1000190028 False True I Japan 2005
c�\��17ɟ�|Japan|2013 48433160145 1001371402 FundamentalSeries NULL NULL 885 2010667500 -19.6553084635268 SAMI_TTM_YoY NULL False 3011646 1000190297 False False I Japan 2013
j���}��||Japan|2010 48433159175 1001371402 FundamentalSeries NULL NULL 214 420693538 -17.3468681844827 SCOR_YoY NULL False 3011646 1000192789 False False I Japan 2010
选项 1:请注意,hive hbase 集成和查询 hive 也将在后台使用 mapreduce...
但是您无法对 hive 执行的 mapreduce 进行细粒度控制。
选项3:你也排除了你提到的选项3即凤凰
选项 4:Impala 速度更快,但您有某些限制。所以排除了
选项 2:根据我使用 hbase 的经验,我会提供使用 mapreduce 从 HBase 中提取数据。即您的选项 2 将对作业的执行进行更精细的控制。
但在这种方法中,您还必须微调您的工作。
scan.setCaching(500);
scan.setCacheBlocks(false);
- 最重要的是你必须设计你的 rowkey 以避免 hot spotting and use efficient filters (like
FuzzyRowFilter
for instance see here),以确保快速访问。 - 尽量避免使用列值过滤器,以确保不会发生完整 table 扫描。
- 请注意,table 的区域数量等于为该特定作业启动的映射器数量。因此,将 table 预先拆分到某个范围(例如 0-9)之间,以便您的所有行都属于这些确定的区域(当然它可以进一步拆分为多个区域,但这是一种方式如果确保更少的区域数量,因此所有映射器都会获得足够数量的记录来处理...)
如果我没看错的话。你想生成多个序列文件;
请查看使用 MultipleOutputs 的使用模式。
see Usage pattern for job submission:
Job job = new Job();
FileInputFormat.setInputPath(job, inDir);
FileOutputFormat.setOutputPath(job, outDir);
job.setMapperClass(MOMap.class);
job.setReducerClass(MOReduce.class);
...
// Defines additional single text based output 'text' for the job
MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
LongWritable.class, Text.class);
// Defines additional sequence-file based output 'sequence' for the job
MultipleOutputs.addNamedOutput(job, "seq",
SequenceFileOutputFormat.class,
LongWritable.class, Text.class);
...
job.waitForCompletion(true);
...