Hadoop mapreduce - reducer 不是 运行
Hadoop mapreduce - reducer not running
我正在尝试将批量加载 map-reduce 自定义到 HBase,而我 运行 遇到了 reducer 问题。一开始我以为我没有写好 reducer,但是在 reducer 中抛出运行时异常并看到代码运行时,我意识到 reducer 根本不是 运行。
到目前为止,我没有发现这个问题的一些常见答案有任何问题;
- 我的配置有mapoutput和output分开
- 我的 reducer 和 mapper 有覆盖
- 我有 Iterable,我的 reducer 输入是 (writable, put),所以...
这是我的代码:
Driver
public int run(String[] args) throws Exception {
int result=0;
String outputPath = args[1];
Configuration configuration = getConf();
configuration.set("data.seperator", DATA_SEPERATOR);
configuration.set("hbase.table.name",TABLE_NAME);
configuration.set("COLUMN_FAMILY_1",COLUMN_FAMILY_1);
Job job = new Job(configuration);
job.setJarByClass(HBaseBulkLoadDriver.class);
job.setJobName("Bulk Loading HBase Table::"+TABLE_NAME);
job.setInputFormatClass(TextInputFormat.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapperClass(HBaseBulkLoadMapper.class);
job.setReducerClass(HBaseBulkLoadReducer.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
FileInputFormat.addInputPaths(job, args[0]);
FileSystem.getLocal(getConf()).delete(new Path(outputPath), true);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.setMapOutputValueClass(Put.class);
job.setNumReduceTasks(1);
HFileOutputFormat.configureIncrementalLoad(job, new HTable(configuration,TABLE_NAME));
job.waitForCompletion(true);
映射器
public class HBaseBulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
private String hbaseTable;
private String dataSeperator;
private String columnFamily1;
private ImmutableBytesWritable hbaseTableName;
public void setup(Context context) {
Configuration configuration = context.getConfiguration();
hbaseTable = configuration.get("hbase.table.name");
dataSeperator = configuration.get("data.seperator");
columnFamily1 = configuration.get("COLUMN_FAMILY_1");
hbaseTableName = new ImmutableBytesWritable(Bytes.toBytes(hbaseTable));
}
@Override
public void map(LongWritable key, Text value, Context context) {
try {
String[] values = value.toString().split(dataSeperator);
String rowKey = values[0];
Put put = new Put(Bytes.toBytes(rowKey));
BUNCH OF ADDS;
context.write(new ImmutableBytesWritable(Bytes.toBytes(rowKey)), put);
} catch(Exception exception) {
exception.printStackTrace();
}
}
}
减速机
public class HBaseBulkLoadReducer extends Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, Put> {
@Override
protected void reduce(
ImmutableBytesWritable row,
Iterable<Put> puts,
Reducer<ImmutableBytesWritable, Put,
ImmutableBytesWritable, Put>.Context context)
throws java.io.IOException, InterruptedException
{
TreeMap<String,KeyValue> map = new TreeMap<String,KeyValue>();
int count =0;
Append nkv;
byte[] tmp= "".getBytes();
Put pp = new Put(tmp);
try{
for (Put p : puts) {
byte[] r = "".getBytes();
//KeyValue kv = new KeyValue(r);
if (count!=0){
r = p.getRow();
pp.add(new KeyValue(r));
//KeyValue k = map.get(row.toString());
//nkv = new Append(k.getRowArray());
//nkv=nkv.add(kv);
//map.put(row.toString(), k.clone());
//context.write(row,nkv);
//tmp=ArrayUtils.addAll(tmp,kv.getValueArray());
//map.put(row.toString(),new KeyValue(kv.getRowArray(),kv.getFamilyArray(),kv.getQualifierArray(),tmp));
count++;
throw new RuntimeException();
}
else{
r = p.getRow();
pp = new Put(row.toString().getBytes());
pp.add(new KeyValue(r));
//tmp=kv.clone().getValueArray();
//nkv = new Append(kv.getRowArray());
//map.put(row.toString(), kv.clone());
count++;
throw new RuntimeException();
}
}
context.write(row,pp);
}catch(Exception e) { e.printStackTrace();}
}
}
好吧,我知道 reducer 有点乱,但事实是,如您所见,它在 if 和 else 子句上都有 runtimeException,并且批量加载成功,所以我很确定 reducer 不是 运行 - 我不确定为什么。所有三个文件都打包在同一目录中,仅供参考。
找出问题所在。 configureincrementalload 根据输出值将 reducer class 设置为 putsort 或 keyvaluesort,因此如果我想使用自定义 reducer class 我必须在 configureincrementalload 之后设置它。之后我可以看到 reducer 运行ning。只是回答我自己的问题,所以它可能会帮助 运行 遇到同样问题的人。
HFileOutputFormat.configureIncrementalLoad(job, new HTable(configuration,TABLE_NAME));
job.setReducerClass(HBaseBulkLoadReducer.class);
job.waitForCompletion(true);
我正在尝试将批量加载 map-reduce 自定义到 HBase,而我 运行 遇到了 reducer 问题。一开始我以为我没有写好 reducer,但是在 reducer 中抛出运行时异常并看到代码运行时,我意识到 reducer 根本不是 运行。 到目前为止,我没有发现这个问题的一些常见答案有任何问题;
- 我的配置有mapoutput和output分开
- 我的 reducer 和 mapper 有覆盖
- 我有 Iterable,我的 reducer 输入是 (writable, put),所以...
这是我的代码:
Driver
public int run(String[] args) throws Exception {
int result=0;
String outputPath = args[1];
Configuration configuration = getConf();
configuration.set("data.seperator", DATA_SEPERATOR);
configuration.set("hbase.table.name",TABLE_NAME);
configuration.set("COLUMN_FAMILY_1",COLUMN_FAMILY_1);
Job job = new Job(configuration);
job.setJarByClass(HBaseBulkLoadDriver.class);
job.setJobName("Bulk Loading HBase Table::"+TABLE_NAME);
job.setInputFormatClass(TextInputFormat.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapperClass(HBaseBulkLoadMapper.class);
job.setReducerClass(HBaseBulkLoadReducer.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
FileInputFormat.addInputPaths(job, args[0]);
FileSystem.getLocal(getConf()).delete(new Path(outputPath), true);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.setMapOutputValueClass(Put.class);
job.setNumReduceTasks(1);
HFileOutputFormat.configureIncrementalLoad(job, new HTable(configuration,TABLE_NAME));
job.waitForCompletion(true);
映射器
public class HBaseBulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
private String hbaseTable;
private String dataSeperator;
private String columnFamily1;
private ImmutableBytesWritable hbaseTableName;
public void setup(Context context) {
Configuration configuration = context.getConfiguration();
hbaseTable = configuration.get("hbase.table.name");
dataSeperator = configuration.get("data.seperator");
columnFamily1 = configuration.get("COLUMN_FAMILY_1");
hbaseTableName = new ImmutableBytesWritable(Bytes.toBytes(hbaseTable));
}
@Override
public void map(LongWritable key, Text value, Context context) {
try {
String[] values = value.toString().split(dataSeperator);
String rowKey = values[0];
Put put = new Put(Bytes.toBytes(rowKey));
BUNCH OF ADDS;
context.write(new ImmutableBytesWritable(Bytes.toBytes(rowKey)), put);
} catch(Exception exception) {
exception.printStackTrace();
}
}
}
减速机
public class HBaseBulkLoadReducer extends Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, Put> {
@Override
protected void reduce(
ImmutableBytesWritable row,
Iterable<Put> puts,
Reducer<ImmutableBytesWritable, Put,
ImmutableBytesWritable, Put>.Context context)
throws java.io.IOException, InterruptedException
{
TreeMap<String,KeyValue> map = new TreeMap<String,KeyValue>();
int count =0;
Append nkv;
byte[] tmp= "".getBytes();
Put pp = new Put(tmp);
try{
for (Put p : puts) {
byte[] r = "".getBytes();
//KeyValue kv = new KeyValue(r);
if (count!=0){
r = p.getRow();
pp.add(new KeyValue(r));
//KeyValue k = map.get(row.toString());
//nkv = new Append(k.getRowArray());
//nkv=nkv.add(kv);
//map.put(row.toString(), k.clone());
//context.write(row,nkv);
//tmp=ArrayUtils.addAll(tmp,kv.getValueArray());
//map.put(row.toString(),new KeyValue(kv.getRowArray(),kv.getFamilyArray(),kv.getQualifierArray(),tmp));
count++;
throw new RuntimeException();
}
else{
r = p.getRow();
pp = new Put(row.toString().getBytes());
pp.add(new KeyValue(r));
//tmp=kv.clone().getValueArray();
//nkv = new Append(kv.getRowArray());
//map.put(row.toString(), kv.clone());
count++;
throw new RuntimeException();
}
}
context.write(row,pp);
}catch(Exception e) { e.printStackTrace();}
}
}
好吧,我知道 reducer 有点乱,但事实是,如您所见,它在 if 和 else 子句上都有 runtimeException,并且批量加载成功,所以我很确定 reducer 不是 运行 - 我不确定为什么。所有三个文件都打包在同一目录中,仅供参考。
找出问题所在。 configureincrementalload 根据输出值将 reducer class 设置为 putsort 或 keyvaluesort,因此如果我想使用自定义 reducer class 我必须在 configureincrementalload 之后设置它。之后我可以看到 reducer 运行ning。只是回答我自己的问题,所以它可能会帮助 运行 遇到同样问题的人。
HFileOutputFormat.configureIncrementalLoad(job, new HTable(configuration,TABLE_NAME));
job.setReducerClass(HBaseBulkLoadReducer.class);
job.waitForCompletion(true);