MapReduce 设计模式中的 Mapper Class 和 Reducer Class

Mapper Class and Reducer Class in MapReduce Design pattern

我是 MapReduce 的新手,我对 Mapper class 和 Reducer Class 设计中的这段代码有一些疑问

我熟悉 MapReduce 中的 Map Side Joining,我了解到:

public static class CustsMapper extends Mapper<Object, Text, Text, Text> {
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

这里,在上面的代码片段中,我了解到我们将 class 扩展到 Mapper class,因为 Object 是一个键,Text 是一个值,因此 map 方法将此键值作为输入,context 对象在这里帮助像这样作为输出 context.write(new Text(), new Text()) 根据代码逻辑主体的设计。

我的两个问题是:

  1. 为什么我们将 class 扩展到 MapReduceBase(它有什么作用?)以及为什么我们将 class 实现到 Mapper(我知道这是 class 但在 网络某处显示为界面,所以如果我将其扩展到 org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> class有什么问题?

  2. map 函数中 OutputCollector<Text, IntWritable> output, Reporter reporter 是什么?我不知道吗?我知道 Context context 应该在这里,但什么是 OutputCollectorReporter 这里?

我正在执行下面给出的程序:

输入:

1979   23   23   2   43   24   25   26   26   26   26   25   26  25 
1980   26   27   28  28   28   30   31   31   31   30   30   30  29 
1981   31   32   32  32   33   34   35   36   36   34   34   34  34 
1984   39   38   39  39   39   41   42   43   40   39   38   38  40 
1985   38   39   39  39   39   41   41   41   00   40   39   39  45 

代码:

package hadoop; 

import java.util.*; 

import java.io.IOException; 
import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class ProcessUnits 
{ 
   //Mapper class 
   public static class E_EMapper extends MapReduceBase implements Mapper<LongWritable ,Text,Text,IntWritable>       
   {       
      //Map function 
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException 
      { 
         String line = value.toString(); 
         String lasttoken = null; 
         StringTokenizer s = new StringTokenizer(line,"\t"); 
         String year = s.nextToken(); 

         while(s.hasMoreTokens())
            {
               lasttoken=s.nextToken();
            } 

         int avgprice = Integer.parseInt(lasttoken); 
         output.collect(new Text(year), new IntWritable(avgprice)); 
      } 
   } 


   //Reducer class 
   public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable> 
   {     
      //Reduce function 
      public void reduce( Text key, Iterator <IntWritable> values, 
         OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException 
         { 
            int maxavg=30; 
            int val=Integer.MIN_VALUE; 

            while (values.hasNext()) 
            { 
               if((val=values.next().get())>maxavg) 
               { 
                  output.collect(key, new IntWritable(val)); 
               } 
            } 

         } 
   }  


   //Main function 
   public static void main(String args[])throws Exception 
   { 
      JobConf conf = new JobConf(ProcessUnits.class); 

      conf.setJobName("max_eletricityunits"); 
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class); 
      conf.setMapperClass(E_EMapper.class); 
      conf.setCombinerClass(E_EReduce.class); 
      conf.setReducerClass(E_EReduce.class); 
      conf.setInputFormat(TextInputFormat.class); 
      conf.setOutputFormat(TextOutputFormat.class); 

      FileInputFormat.setInputPaths(conf, new Path(args[0])); 
      FileOutputFormat.setOutputPath(conf, new Path(args[1])); 

      JobClient.runJob(conf); 
   } 
} 

输出:

1981    34 
1984    40 
1985    45 

Why we have extended our class to MapReduceBase(what it does?) and why we have implemented our class to Mapper

因为那是在 Hadoop 2.x 出现之前用 mapred API 编写的旧代码。

I knew that Context context should be here but what is OutputCollector and Reporter here

这是上下文对象的先前版本。

Hadoop: How does OutputCollector work during MapReduce?
How outputcollector works?