为什么Mapper的outputkey/Value需要和Combiner的Outputkey/value一致

Why output key/Value of Mapper needs to be same as that of Output key/value ofCombiner

我正在尝试学习 MapReduce 并且对它很陌生。我研究了 Combiner 通过减少数据节点级别本身的映射器输出而提供的优化。

现在,映射器输出 key/val 和组合器输入 key/value 需要相同是可以理解的。但我无法理解组合器输出 key/value 和映射器输出 key/Val 需要相同的事实。

如果我想找到名称,价格形式的数据的平均值,那么我可能会选择以下:

Mapper<LongWritable, Text, Text, IntWritable>
Combiner<Text, IntWritable, Text, FloatWritable>
Reducer<Text, IntWritable, Text, FloatWritable>

通过这样做我得到了错误,当我在线阅读时我发现 Mapper 和 Combiner 的输出需要相同但找不到​​原因。

下面是我的示例数据:

Schema - cid,cname,email,date,pid,pname,price
101,jai,j@j.com,1-aug-2016,1,iphone,65000
101,jai,j@j.com,1-aug-2016,2,ipad,35000
101,jai,j@j.com,1-aug-2016,3,Samsung S5,34000

下面是我的代码:

import java.io.IOException;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;


 
public class q1 {
    //cid,cname,email,date,pid,pname,price
    
    public static class avg_mapper extends Mapper<LongWritable, Text, Text, IntWritable>{
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
            String[] line = value.toString().split(",");
            Text cname = new Text(line[1]);
            IntWritable price = new IntWritable(Integer.parseInt(line[6]));
            context.write(cname, price);
        }
    }
    public static class avg_reducer extends Reducer<Text, IntWritable, Text, FloatWritable>{
        public void reduce(Text key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException{
            int sum = 0;
            int count=0;
            for (IntWritable val : value){
                count+=1;
                sum+=val.get();
            }
            Float avg = (float)sum/count;
            context.write(key,new FloatWritable(avg));
        }
    }
        
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "Average");
        job.setJarByClass(q1.class);
        job.setMapperClass(avg_mapper.class);
        job.setReducerClass(avg_reducer.class);
        job.setCombinerClass(avg_reducer.class);
        
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FloatWritable.class);
        
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0:1);
        
        
        
    }

}

下面是我得到的错误:

Error: java.io.IOException: wrong value class: class org.apache.hadoop.io.FloatWritable is not class org.apache.hadoop.io.IntWritable
at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:194)
at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1374)
at org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1691)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at q1$avg_reducer.reduce(q1.java:34)
at q1$avg_reducer.reduce(q1.java:1)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1712)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1641)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1492)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:729)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:799)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

我正在尝试查找每个 cname 的平均价格。感谢任何帮助。

But I can't digest the fact that combiner output key/value and mapper output key/Val needs to be the same.

很简单,reducer 的输入类型没有改变,所以在你的情况下它总是 (Text, IntWritable)。 reducer 不关心这个输入是如何提供给它的。它总是希望输入是 (Text, IntWritable) 类型,所以 mapper 和 combiner 的输出应该是相同的并且应该是 (Text, IntWritable).

但是你应该知道的第一件事是你永远不应该把你的应用程序的一些逻辑放在组合器中 mapreduce.There 当 Hadoop 是 运行宁你的工作。而且 Hadoop 在执行作业时可能 运行 组合器不止一次。

那么combiner的作用是什么?

combiner 的唯一目标是减少从执行 mapper 任务的机器发送到执行 运行 reducer 任务的机器的数据量。如果你想编写一个组合器,你应该以这样的方式设计它,即这个组合器在 mapreeduce 中执行的次数不会影响你的应用程序的输出。

现在暂时认为您已经更改了地图的输出类型,因此它可以是 运行 而不会出错。您的申请还有其他问题吗?绝对是。

假设您有这样的输入:

101,jai,j@j.com,1-aug-2016,1,iphone,65000
101,jai,j@j.com,1-aug-2016,2,ipad,35000
101,jai,j@j.com,1-aug-2016,3,Samsung S5,34000

所以地图输出将是这样的:

jai -> 65000
jai -> 35000
jai -> 34000

现在想象两种不同场景下的 reducer 输入:

第一个场景组合器根本没有执行:

jai -> 65000
jai -> 35000
jai -> 34000

在这种情况下,reducer 输出将是:

jai -> 44666.666666666664

第二种情况组合器在映射器输出中的两个第一个元素上执行:

jai -> 50000 // combiner executed on the first two item above and produce jai -> (65000 + 35000) / 2

jai -> 34000 // the third is sent to the reducer without combiner executed on it

在这种情况下,reducer 的输出将是:

jai -> 67000 // (50000 + 34000) / 2

很明显,您的申请结果将取决于组合器的执行次数。

一种解决方法是将权重分配给发送到减速器和组合器的值,例如对于减速器输出上方的相同输入,将如下所示:

jai -> 1-65000 // this shows both weigh and value separated by dash(-)
jai -> 1-35000
jai -> 1-34000

现在想象一下组合器根本不执行的第一个场景:

在这种情况下,reducer 输入将是上面 mapper 的输出,因此 reducer 的输出将是:

jai -> 3-44666.666666666664

第二种情况是在两个第一个元素上执行组合器,因此组合器输出将如下所示:

jai -> 2-50000 // this is jai -> 2 - (65000 + 35000) / 2
jai -> 1-34000

所以reducer输出将是:

jai -> 3-44666.666666666664 //   3 - (2 * 50000) + (1 * 34000) / 3

这样无论您的组合器将 运行 应用多少次,您的应用程序的输出将始终相同。

实施:

有很多方法可以在 mapreduce 中实现这个解决方案。您可以定义自己的 Writable 类型来保存权重和平均值,也可以使用简单文本并用短划线字符 (-) 分隔它们。为了简单起见,我选择了第二个。

这是映射器实现:

public class AverageMapper extends Mapper<LongWritable, Text, Text, Text> {

    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String[] line = value.toString().split(",");
        Text cname = new Text(line[1]);
        context.write(cname, new Text(1 + "-" + String.valueOf(line[6])));
    }
}

这里是减速器实现:

public class AverageReducer extends Reducer<Text, Text, Text, Text> {

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        double sum = 0D;
        long count = 0L;
        long elementCount;
        for(Text value : values) {
            String str = new String(value.copyBytes());
            String[] result = str.split("-");
            elementCount = Long.valueOf(result[0]);
            count += elementCount;
            sum += elementCount * Double.valueOf(result[1]);
        }
        context.write(key, new Text(String.valueOf(count + "-" + (sum / count))));
    } 
}

请注意,当组合器执行不同时间时,有时结果之间会有微小差异(由于浮点舍入问题),但这是可以接受的,不会有显着差异。