在 HBase 单元中保存多个版本
Saving multiple versions in HBase cell
我是 HBase 新手。我试图在 HBase 的一个单元格中保存多个版本,但我只得到最后保存的值。我尝试了以下两个命令来检索多个保存的版本:
get 'Dummy1','abc', {COLUMN=>'backward:first', VERSIONS=>12}
和 scan 'Dummy1', {VERSIONS=>12}
两者都返回如下输出:
ROW COLUMN+CELL
abc column=backward:first, timestamp=1422722312845, value=rrb
1 行在 0.0150 秒内
输入文件如下:
abc xyz kkk
abc qwe asd
abc anf rrb
在HBase中创建Table的代码如下:
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
public class HBaseTableCreator {
public static void main(String[] args) throws Exception {
HBaseConfiguration conf = new HBaseConfiguration();
conf.set("hbase.master","localhost:60000");
HBaseAdmin hbase = new HBaseAdmin(conf);
HTableDescriptor desc = new HTableDescriptor("Dummy");
HColumnDescriptor meta = new HColumnDescriptor("backward".getBytes());
meta.setMaxVersions(Integer.MAX_VALUE);
HColumnDescriptor prefix = new HColumnDescriptor("forward".getBytes());
prefix.setMaxVersions(Integer.MAX_VALUE);
desc.addFamily(meta);
desc.addFamily(prefix);
hbase.createTable(desc);
}
}
Dump数据到HBase的代码如下:
主要Class:
导入 java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class TestMain {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException
{
// TODO Auto-generated method stub
Configuration conf=new Configuration();
//HTable hTable = new HTable(conf, args[3]);
String[] otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length!=2)
{
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job=new Job(conf,"HBase dummy dump");
job.setJarByClass(TestMain.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(TestMapper.class);
TableMapReduceUtil.initTableReducerJob("Dummy", null, job);
//job.setOutputKeyClass(NullWritable.class);
//job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);
//job.setOutputKeyClass(Text.class);
//job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
//HFileOutputFormat.configureIncrementalLoad(job, hTable);
System.exit(job.waitForCompletion(true)?0:1);
}
}
映射器Class:
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
public class TestMapper extends Mapper <LongWritable, Text, Text, Put>{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line=value.toString();
String[] l=line.split("\s+");
for(int i=1;i<l.length;i++)
{
Put HPut = new Put(l[0].getBytes());
HPut.add("backward".getBytes(),"first".getBytes(),l[i].getBytes());
context.write(new Text(l[0]),HPut);
}
}
}
请告诉我哪里错了。
您的问题是您的写入会自动进行批处理,并在作业结束时刷新(当 table 关闭时),可能导致每个放置操作都具有完全相同的时间戳,他们基本上是在覆盖自己(编写一个与另一个版本具有相同时间戳的版本会覆盖该版本,而不是插入一个新版本)。
解决问题的第一种方法可能是自己用 Put HPut = new Put(l[0].getBytes(), System.currentTimeMillis());
提供时间戳,但您可能会遇到同样的问题,因为操作速度太快以至于很多 puts 将具有相同的时间戳.
这就是我要克服这个问题的方法:
1- 停止使用 TableMapReduceUtil.initTableReducerJob
转而使用处理写入 hbase table.
的自定义减速器
2- 修改映射器以将每一行的所有值写入上下文,以便将它们分组为一个可迭代对象并传递给缩减器(即:abc, xyz kkk qwe asd anf rrb
)
3- 实现我自己的减速器以像这样工作 伪代码:
Define myHTable
setup() {
Instantiate myHtable
Disable myHtable autoflush to prevent puts from being automatically flushed
Set myHtable write buffer to at least 2MB
}
reduce(rowkey, results) {
baseTimestamp = current time in milliseconds
Iterate results {
Instantiate put with rowkey ++baseTimestamp
Add result to put
Send put to myHTable
}
}
cleanup() {
Flush commits for myHTable
Close myHTable
}
这样,每个版本之间总是有 1ms 的间隔,唯一需要注意的是,如果你有大量的版本并且 运行 多次执行相同的作业,新作业的时间戳可能会与前一个作业的时间戳重叠,如果您预计版本少于 30k,则不必担心,因为每个作业与下一个作业至少相距 30 秒...
无论如何,请注意,不建议拥有超过一百个版本 (http://hbase.apache.org/book.html#versions),如果您需要更多版本,则采用 tall 方法(包含 key+ 的复合行键时间戳),完全没有版本。
抱歉奇怪的格式,这是唯一能很好地显示伪代码的方法。
我是 HBase 新手。我试图在 HBase 的一个单元格中保存多个版本,但我只得到最后保存的值。我尝试了以下两个命令来检索多个保存的版本:
get 'Dummy1','abc', {COLUMN=>'backward:first', VERSIONS=>12}
和 scan 'Dummy1', {VERSIONS=>12}
两者都返回如下输出:
ROW COLUMN+CELL
abc column=backward:first, timestamp=1422722312845, value=rrb
1 行在 0.0150 秒内 输入文件如下:
abc xyz kkk
abc qwe asd
abc anf rrb
在HBase中创建Table的代码如下:
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
public class HBaseTableCreator {
public static void main(String[] args) throws Exception {
HBaseConfiguration conf = new HBaseConfiguration();
conf.set("hbase.master","localhost:60000");
HBaseAdmin hbase = new HBaseAdmin(conf);
HTableDescriptor desc = new HTableDescriptor("Dummy");
HColumnDescriptor meta = new HColumnDescriptor("backward".getBytes());
meta.setMaxVersions(Integer.MAX_VALUE);
HColumnDescriptor prefix = new HColumnDescriptor("forward".getBytes());
prefix.setMaxVersions(Integer.MAX_VALUE);
desc.addFamily(meta);
desc.addFamily(prefix);
hbase.createTable(desc);
}
}
Dump数据到HBase的代码如下: 主要Class: 导入 java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class TestMain {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException
{
// TODO Auto-generated method stub
Configuration conf=new Configuration();
//HTable hTable = new HTable(conf, args[3]);
String[] otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length!=2)
{
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job=new Job(conf,"HBase dummy dump");
job.setJarByClass(TestMain.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(TestMapper.class);
TableMapReduceUtil.initTableReducerJob("Dummy", null, job);
//job.setOutputKeyClass(NullWritable.class);
//job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);
//job.setOutputKeyClass(Text.class);
//job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
//HFileOutputFormat.configureIncrementalLoad(job, hTable);
System.exit(job.waitForCompletion(true)?0:1);
}
}
映射器Class:
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
public class TestMapper extends Mapper <LongWritable, Text, Text, Put>{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line=value.toString();
String[] l=line.split("\s+");
for(int i=1;i<l.length;i++)
{
Put HPut = new Put(l[0].getBytes());
HPut.add("backward".getBytes(),"first".getBytes(),l[i].getBytes());
context.write(new Text(l[0]),HPut);
}
}
}
请告诉我哪里错了。
您的问题是您的写入会自动进行批处理,并在作业结束时刷新(当 table 关闭时),可能导致每个放置操作都具有完全相同的时间戳,他们基本上是在覆盖自己(编写一个与另一个版本具有相同时间戳的版本会覆盖该版本,而不是插入一个新版本)。
解决问题的第一种方法可能是自己用 Put HPut = new Put(l[0].getBytes(), System.currentTimeMillis());
提供时间戳,但您可能会遇到同样的问题,因为操作速度太快以至于很多 puts 将具有相同的时间戳.
这就是我要克服这个问题的方法:
1- 停止使用 TableMapReduceUtil.initTableReducerJob
转而使用处理写入 hbase table.
2- 修改映射器以将每一行的所有值写入上下文,以便将它们分组为一个可迭代对象并传递给缩减器(即:abc, xyz kkk qwe asd anf rrb
)
3- 实现我自己的减速器以像这样工作 伪代码:
Define myHTable
setup() {
Instantiate myHtable
Disable myHtable autoflush to prevent puts from being automatically flushed
Set myHtable write buffer to at least 2MB
}
reduce(rowkey, results) {
baseTimestamp = current time in milliseconds
Iterate results {
Instantiate put with rowkey ++baseTimestamp
Add result to put
Send put to myHTable
}
}
cleanup() {
Flush commits for myHTable
Close myHTable
}
这样,每个版本之间总是有 1ms 的间隔,唯一需要注意的是,如果你有大量的版本并且 运行 多次执行相同的作业,新作业的时间戳可能会与前一个作业的时间戳重叠,如果您预计版本少于 30k,则不必担心,因为每个作业与下一个作业至少相距 30 秒...
无论如何,请注意,不建议拥有超过一百个版本 (http://hbase.apache.org/book.html#versions),如果您需要更多版本,则采用 tall 方法(包含 key+ 的复合行键时间戳),完全没有版本。
抱歉奇怪的格式,这是唯一能很好地显示伪代码的方法。