org.apache.hadoop.mapred.MapTask$NewOutputCollector 在关闭期间忽略异常
Ignoring exception during close for org.apache.hadoop.mapred.MapTask$NewOutputCollector
我创建了一个 hadoop 自定义可写对象,如下所示
public class ResultType implements Writable {
private Text xxxx;
private Text yyyy;
private Text zzzz;
public ResultType() {}
public ResultType(Text xxxx, Text yyyy, Text zzzz) {
this.xxxx = xxxx;
this.yyyy = yyyy;
this.zzzz = zzzz;
}
public Text getxxxx() {
return this.xxxx;
}
public Text getyyyy() {
return this.yyyy;
}
public Text getzzzz() {
return this.zzzz;
}
@Override
public void readFields(DataInput in) throws IOException {
xxxx.readFields(in);
yyyy.readFields(in);
zzzz.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
xxxx.write(out);
yyyy.write(out);
zzzz.write(out);
}
}
我的映射器代码是
public static class Mapper1 extends TableMapper<Text, ResultType> {
private Text text = new Text();
@Override
public void map(ImmutableBytesWritable row, Result values, Context context)
throws IOException, InterruptedException {
// getting name value
String xxxx = new String(values.getValue(Bytes.toBytes("cf"), Bytes.toBytes("xxxx")));
String yyyy = new String(values.getValue(Bytes.toBytes("cf"), Bytes.toBytes("yyyy")));
String zzzz = new String(values.getValue(Bytes.toBytes("cf"), Bytes.toBytes("zzzz")));
text.set(xxxx);
context.write(text, new ResultType(new Text(xxxx), new Text(yyyy), new Text(zzzz)));
}
}
我的 Reducer 代码是
public static class Reducer1 extends Reducer<Text, ResultType, Text, ResultType> {
public void reduce(Text key, Iterable<ResultType> values, Context context)
throws IOException, InterruptedException {
List<ResultType> returnset = new ArrayList<ResultType>();
Map<String, ResultType> duplicatelist = new HashMap<String, ResultType>();
boolean iskeyadded = true;
for (ResultType val : values) {
Text yyyy = val.getyyyy();
Text zzzz = val.getzzzz();
String groupkey = yyyy + "," + zzzz ;
if (duplicatelist.containsKey(groupkey)) {
if (iskeyadded) {
context.write(key, new ResultType(new Text(key), new Text(yyyy),
new Text(zzzz)));
iskeyadded = false;
}
context.write(key, new ResultType(new Text(key), new Text(yyyy), new Text(zzzz)));
} else {
duplicatelist.put(groupkey, val);
}
}
}
}
当我 运行 我得到这个代码时
Ignoring exception during close for org.apache.hadoop.mapred.MapTask$NewOutputCollector@20890b6f
java.lang.NullPointerException
at test.ResultType.readFields(ResultType.java)
at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:71)
at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42)
at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKeyValue(ReduceContextImpl.java:146)
at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKey(ReduceContextImpl.java:121)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.nextKey(WrappedReducer.java:302)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:170)
at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1688)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1637)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1489)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:723)
at org.apache.hadoop.mapred.MapTask.closeQuietly(MapTask.java:2019)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:797)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
您得到了 NullPointerException
,因为您的自定义可写对象中的 none 个 Text
对象是在任何地方创建的。您可以在 class.
顶部声明它们的地方创建它们
private Text xxxx = new Text();
private Text yyyy = new Text();
private Text zzzz = new Text();
我还建议您将设置它们的构造函数更改为:
public ResultType(Text xxxx, Text yyyy, Text zzzz) {
this.xxxx.set(xxxx);
this.yyyy.set(yyyy);
this.zzzz.set(zzzz);
}
与字符串不同,Text 对象不是不可变的,因此使它们相等不会创建新的 Text 对象。如果您尝试在别处重用 Text 对象,这将导致问题。
我创建了一个 hadoop 自定义可写对象,如下所示
public class ResultType implements Writable {
private Text xxxx;
private Text yyyy;
private Text zzzz;
public ResultType() {}
public ResultType(Text xxxx, Text yyyy, Text zzzz) {
this.xxxx = xxxx;
this.yyyy = yyyy;
this.zzzz = zzzz;
}
public Text getxxxx() {
return this.xxxx;
}
public Text getyyyy() {
return this.yyyy;
}
public Text getzzzz() {
return this.zzzz;
}
@Override
public void readFields(DataInput in) throws IOException {
xxxx.readFields(in);
yyyy.readFields(in);
zzzz.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
xxxx.write(out);
yyyy.write(out);
zzzz.write(out);
}
}
我的映射器代码是
public static class Mapper1 extends TableMapper<Text, ResultType> {
private Text text = new Text();
@Override
public void map(ImmutableBytesWritable row, Result values, Context context)
throws IOException, InterruptedException {
// getting name value
String xxxx = new String(values.getValue(Bytes.toBytes("cf"), Bytes.toBytes("xxxx")));
String yyyy = new String(values.getValue(Bytes.toBytes("cf"), Bytes.toBytes("yyyy")));
String zzzz = new String(values.getValue(Bytes.toBytes("cf"), Bytes.toBytes("zzzz")));
text.set(xxxx);
context.write(text, new ResultType(new Text(xxxx), new Text(yyyy), new Text(zzzz)));
}
}
我的 Reducer 代码是
public static class Reducer1 extends Reducer<Text, ResultType, Text, ResultType> {
public void reduce(Text key, Iterable<ResultType> values, Context context)
throws IOException, InterruptedException {
List<ResultType> returnset = new ArrayList<ResultType>();
Map<String, ResultType> duplicatelist = new HashMap<String, ResultType>();
boolean iskeyadded = true;
for (ResultType val : values) {
Text yyyy = val.getyyyy();
Text zzzz = val.getzzzz();
String groupkey = yyyy + "," + zzzz ;
if (duplicatelist.containsKey(groupkey)) {
if (iskeyadded) {
context.write(key, new ResultType(new Text(key), new Text(yyyy),
new Text(zzzz)));
iskeyadded = false;
}
context.write(key, new ResultType(new Text(key), new Text(yyyy), new Text(zzzz)));
} else {
duplicatelist.put(groupkey, val);
}
}
}
}
当我 运行 我得到这个代码时
Ignoring exception during close for org.apache.hadoop.mapred.MapTask$NewOutputCollector@20890b6f
java.lang.NullPointerException
at test.ResultType.readFields(ResultType.java)
at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:71)
at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42)
at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKeyValue(ReduceContextImpl.java:146)
at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKey(ReduceContextImpl.java:121)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.nextKey(WrappedReducer.java:302)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:170)
at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1688)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1637)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1489)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:723)
at org.apache.hadoop.mapred.MapTask.closeQuietly(MapTask.java:2019)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:797)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
您得到了 NullPointerException
,因为您的自定义可写对象中的 none 个 Text
对象是在任何地方创建的。您可以在 class.
private Text xxxx = new Text();
private Text yyyy = new Text();
private Text zzzz = new Text();
我还建议您将设置它们的构造函数更改为:
public ResultType(Text xxxx, Text yyyy, Text zzzz) {
this.xxxx.set(xxxx);
this.yyyy.set(yyyy);
this.zzzz.set(zzzz);
}
与字符串不同,Text 对象不是不可变的,因此使它们相等不会创建新的 Text 对象。如果您尝试在别处重用 Text 对象,这将导致问题。