hadoop 自定义可写不产生预期的输出
hadoop custom writable not producing the expected output
我有一组来自映射器的减速器输入:
(1939, [121, 79, 83, 28])
(1980, [0, 211, −113])
我想得到如下输出:
1939 max:121 min:28 avg: 77.75
如果我不在我的减速器中使用自定义可写,我可以得到它 class:
public static class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, Text> {
Text yearlyValue = new Text();
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int sum = 0;
int CounterForAvg = 0;
int minValue = Integer.MAX_VALUE;
int maxValue = Integer.MIN_VALUE;
float avg;
for (IntWritable val : values) {
int currentValue = val.get();
sum += currentValue;
CounterForAvg++;
minValue = Math.min(minValue, currentValue);
maxValue = Math.max(maxValue, currentValue);
}
avg = sum / CounterForAvg;
String requiredValue = "max temp:"+maxValue + "\t" +"avg temp: "+ avg + "\t"+ "min temp: " +minValue;
yearlyValue.set(requiredValue);
context.write(key, yearlyValue);
}
}
但是使用自定义可写 class 会产生以下结果:
1939 121
1939 79
1939 83
1939 28
1980 0
1980 211
1980 -113
以下是我如何实现自定义 class 和减速器。我将可迭代对象发送到自定义 class 并在那里执行计算。我不知道我在这里做错了什么。我在 java.
中有 0 exp
public class CompositeWritable implements Writable {
String data = "";
public CompositeWritable() {
}
public CompositeWritable(String data) {
this.data = data;
}
@Override
public void readFields(DataInput in) throws IOException {
data = WritableUtils.readString(in);
}
@Override
public void write(DataOutput out) throws IOException {
WritableUtils.writeString(out, data);
}
public void merge(Iterable<IntWritable> values) {
int sum = 0;
int CounterForAvg = 0;
int minValue = Integer.MAX_VALUE;
int maxValue = Integer.MIN_VALUE;
float avg;
for (IntWritable val : values) {
int currentValue = val.get();
sum += currentValue;
CounterForAvg++;
minValue = Math.min(minValue, currentValue);
maxValue = Math.max(maxValue, currentValue);
}
avg = sum / CounterForAvg;
data = "max temp:"+maxValue + "\t" +"avg temp: "+ avg + "\t"+ "min temp: " +minValue;
}
@Override
public String toString() {
return data;
}
}
public static class MaxTemperatureReducer
extends Reducer<Text, CompositeWritable,Text, Text> {
CompositeWritable out;
Text textYearlyValue = new Text();
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
out.merge(values);
String requiredOutput = out.toString();
textYearlyValue.set(requiredOutput);
context.write(key,textYearlyValue );
}
}
我的作业配置如下:
Job job = Job.getInstance(getConf(), "MaxAvgMinTemp");
job.setJarByClass(this.getClass());
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
shouldn't the call for merge help me concat the values
当然可以,但是您没有正确使用它。 out
从未被初始化。
CompositeWritable out; // null here
Text textYearlyValue = new Text();
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
out.merge(values); // still null, should throw an exception
如果你想输出一行字符串,你可以只使用一个Text
对象。你的 merge(Iterable<IntWritable> values)
方法可以去任何地方,它不必在一个完全独立的 class 到 return 你一个 Writable 对象。
但是无论如何,如果练习是学习如何实现自定义可写,那么就开始吧。
注意事项:
- 如果你想要"compose"多个字段,那么你应该声明它们
readFields
和write
需要顺序相同
toString
确定您在使用 TextOutputFormat
(默认) 时在减速器输出中看到的内容
equals
和 hashCode
是为了完整性而添加的(理想情况下你会实现 WritableComparable
,但这实际上只对键有影响,对值没有那么重要)
- 为了与其他 Writables 相似,我将您的
merge
方法重命名为 set
。
您可以预期下面的输出看起来像
1939 MinMaxAvgWritable{min=28, max=121, avg=77.75}
1980 MinMaxAvgWritable{min=-113, max=211, avg=32.67}
public class MinMaxAvgWritable implements Writable {
private int min, max;
private double avg;
private DecimalFormat df = new DecimalFormat("#.00");
@Override
public String toString() {
return "MinMaxAvgWritable{" +
"min=" + min +
", max=" + max +
", avg=" + df.format(avg) +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MinMaxAvgWritable that = (MinMaxAvgWritable) o;
return min == that.min &&
max == that.max &&
avg == that.avg;
}
@Override
public int hashCode() {
return Objects.hash(min, max, avg);
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(min);
dataOutput.writeInt(max);
dataOutput.writeDouble(avg);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.min = dataInput.readInt();
this.max = dataInput.readInt();
this.avg = dataInput.readDouble();
}
public void set(int min, int max, double avg) {
this.min = min;
this.max = max;
this.avg = avg;
}
public void set(Iterable<IntWritable> values) {
this.min = Integer.MAX_VALUE;
this.max = Integer.MIN_VALUE;
int sum = 0;
int count = 0;
for (IntWritable iw : values) {
int i = iw.get();
if (i < this.min) this.min = i;
if (i > max) this.max = i;
sum += i;
count++;
}
this.avg = count < 1 ? sum : (sum / (1.0*count));
}
}
有了这个,reducer就很简单了
public class CompositeReducer extends Reducer<Text, IntWritable, Text, MinMaxAvgWritable> {
private final MinMaxAvgWritable output = new MinMaxAvgWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// This 'set/merge' method could just as easily be defined here, and return a String to be set on a Text object
output.set(values);
context.write(key, output);
}
}
作业设置如下
// outputs for mapper and reducer
job.setOutputKeyClass(Text.class);
// setup mapper
job.setMapperClass(TokenizerMapper.class); // Replace with your mapper
job.setMapOutputValueClass(IntWritable.class);
// setup reducer
job.setReducerClass(CompositeReducer.class);
job.setOutputValueClass(MinMaxAvgWritable.class); // notice custom writable
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
我有一组来自映射器的减速器输入:
(1939, [121, 79, 83, 28])
(1980, [0, 211, −113])
我想得到如下输出:
1939 max:121 min:28 avg: 77.75
如果我不在我的减速器中使用自定义可写,我可以得到它 class:
public static class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, Text> {
Text yearlyValue = new Text();
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int sum = 0;
int CounterForAvg = 0;
int minValue = Integer.MAX_VALUE;
int maxValue = Integer.MIN_VALUE;
float avg;
for (IntWritable val : values) {
int currentValue = val.get();
sum += currentValue;
CounterForAvg++;
minValue = Math.min(minValue, currentValue);
maxValue = Math.max(maxValue, currentValue);
}
avg = sum / CounterForAvg;
String requiredValue = "max temp:"+maxValue + "\t" +"avg temp: "+ avg + "\t"+ "min temp: " +minValue;
yearlyValue.set(requiredValue);
context.write(key, yearlyValue);
}
}
但是使用自定义可写 class 会产生以下结果:
1939 121
1939 79
1939 83
1939 28
1980 0
1980 211
1980 -113
以下是我如何实现自定义 class 和减速器。我将可迭代对象发送到自定义 class 并在那里执行计算。我不知道我在这里做错了什么。我在 java.
中有 0 exppublic class CompositeWritable implements Writable {
String data = "";
public CompositeWritable() {
}
public CompositeWritable(String data) {
this.data = data;
}
@Override
public void readFields(DataInput in) throws IOException {
data = WritableUtils.readString(in);
}
@Override
public void write(DataOutput out) throws IOException {
WritableUtils.writeString(out, data);
}
public void merge(Iterable<IntWritable> values) {
int sum = 0;
int CounterForAvg = 0;
int minValue = Integer.MAX_VALUE;
int maxValue = Integer.MIN_VALUE;
float avg;
for (IntWritable val : values) {
int currentValue = val.get();
sum += currentValue;
CounterForAvg++;
minValue = Math.min(minValue, currentValue);
maxValue = Math.max(maxValue, currentValue);
}
avg = sum / CounterForAvg;
data = "max temp:"+maxValue + "\t" +"avg temp: "+ avg + "\t"+ "min temp: " +minValue;
}
@Override
public String toString() {
return data;
}
}
public static class MaxTemperatureReducer
extends Reducer<Text, CompositeWritable,Text, Text> {
CompositeWritable out;
Text textYearlyValue = new Text();
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
out.merge(values);
String requiredOutput = out.toString();
textYearlyValue.set(requiredOutput);
context.write(key,textYearlyValue );
}
}
我的作业配置如下:
Job job = Job.getInstance(getConf(), "MaxAvgMinTemp");
job.setJarByClass(this.getClass());
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
shouldn't the call for merge help me concat the values
当然可以,但是您没有正确使用它。 out
从未被初始化。
CompositeWritable out; // null here
Text textYearlyValue = new Text();
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
out.merge(values); // still null, should throw an exception
如果你想输出一行字符串,你可以只使用一个Text
对象。你的 merge(Iterable<IntWritable> values)
方法可以去任何地方,它不必在一个完全独立的 class 到 return 你一个 Writable 对象。
但是无论如何,如果练习是学习如何实现自定义可写,那么就开始吧。
注意事项:
- 如果你想要"compose"多个字段,那么你应该声明它们
readFields
和write
需要顺序相同toString
确定您在使用TextOutputFormat
(默认) 时在减速器输出中看到的内容
equals
和hashCode
是为了完整性而添加的(理想情况下你会实现WritableComparable
,但这实际上只对键有影响,对值没有那么重要)- 为了与其他 Writables 相似,我将您的
merge
方法重命名为set
。
您可以预期下面的输出看起来像
1939 MinMaxAvgWritable{min=28, max=121, avg=77.75}
1980 MinMaxAvgWritable{min=-113, max=211, avg=32.67}
public class MinMaxAvgWritable implements Writable {
private int min, max;
private double avg;
private DecimalFormat df = new DecimalFormat("#.00");
@Override
public String toString() {
return "MinMaxAvgWritable{" +
"min=" + min +
", max=" + max +
", avg=" + df.format(avg) +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MinMaxAvgWritable that = (MinMaxAvgWritable) o;
return min == that.min &&
max == that.max &&
avg == that.avg;
}
@Override
public int hashCode() {
return Objects.hash(min, max, avg);
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(min);
dataOutput.writeInt(max);
dataOutput.writeDouble(avg);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.min = dataInput.readInt();
this.max = dataInput.readInt();
this.avg = dataInput.readDouble();
}
public void set(int min, int max, double avg) {
this.min = min;
this.max = max;
this.avg = avg;
}
public void set(Iterable<IntWritable> values) {
this.min = Integer.MAX_VALUE;
this.max = Integer.MIN_VALUE;
int sum = 0;
int count = 0;
for (IntWritable iw : values) {
int i = iw.get();
if (i < this.min) this.min = i;
if (i > max) this.max = i;
sum += i;
count++;
}
this.avg = count < 1 ? sum : (sum / (1.0*count));
}
}
有了这个,reducer就很简单了
public class CompositeReducer extends Reducer<Text, IntWritable, Text, MinMaxAvgWritable> {
private final MinMaxAvgWritable output = new MinMaxAvgWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// This 'set/merge' method could just as easily be defined here, and return a String to be set on a Text object
output.set(values);
context.write(key, output);
}
}
作业设置如下
// outputs for mapper and reducer
job.setOutputKeyClass(Text.class);
// setup mapper
job.setMapperClass(TokenizerMapper.class); // Replace with your mapper
job.setMapOutputValueClass(IntWritable.class);
// setup reducer
job.setReducerClass(CompositeReducer.class);
job.setOutputValueClass(MinMaxAvgWritable.class); // notice custom writable
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;