MapReduce 设计模式中的 Mapper Class 和 Reducer Class
Mapper Class and Reducer Class in MapReduce Design pattern
我是 MapReduce 的新手,我对 Mapper class 和 Reducer Class 设计中的这段代码有一些疑问
我熟悉 MapReduce 中的 Map Side Joining,我了解到:
public static class CustsMapper extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
这里,在上面的代码片段中,我了解到我们将 class 扩展到 Mapper
class,因为 Object
是一个键,Text
是一个值,因此 map 方法将此键值作为输入,context
对象在这里帮助像这样作为输出 context.write(new Text(), new Text())
根据代码逻辑主体的设计。
我的两个问题是:
为什么我们将 class 扩展到 MapReduceBase
(它有什么作用?)以及为什么我们将 class 实现到 Mapper
(我知道这是 class 但在
网络某处显示为界面,所以如果我将其扩展到
org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
class有什么问题?
在 map
函数中 OutputCollector<Text, IntWritable> output, Reporter reporter
是什么?我不知道吗?我知道 Context
context
应该在这里,但什么是 OutputCollector
和 Reporter
这里?
我正在执行下面给出的程序:
输入:
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45
代码:
package hadoop;
import java.util.*;
import java.io.IOException;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class ProcessUnits
{
//Mapper class
public static class E_EMapper extends MapReduceBase implements Mapper<LongWritable ,Text,Text,IntWritable>
{
//Map function
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
{
String line = value.toString();
String lasttoken = null;
StringTokenizer s = new StringTokenizer(line,"\t");
String year = s.nextToken();
while(s.hasMoreTokens())
{
lasttoken=s.nextToken();
}
int avgprice = Integer.parseInt(lasttoken);
output.collect(new Text(year), new IntWritable(avgprice));
}
}
//Reducer class
public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable>
{
//Reduce function
public void reduce( Text key, Iterator <IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
{
int maxavg=30;
int val=Integer.MIN_VALUE;
while (values.hasNext())
{
if((val=values.next().get())>maxavg)
{
output.collect(key, new IntWritable(val));
}
}
}
}
//Main function
public static void main(String args[])throws Exception
{
JobConf conf = new JobConf(ProcessUnits.class);
conf.setJobName("max_eletricityunits");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(E_EMapper.class);
conf.setCombinerClass(E_EReduce.class);
conf.setReducerClass(E_EReduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
输出:
1981 34
1984 40
1985 45
Why we have extended our class to MapReduceBase(what it does?) and why we have implemented our class to Mapper
因为那是在 Hadoop 2.x 出现之前用 mapred API 编写的旧代码。
I knew that Context context should be here but what is OutputCollector and Reporter here
这是上下文对象的先前版本。
Hadoop: How does OutputCollector work during MapReduce?
How outputcollector works?
我是 MapReduce 的新手,我对 Mapper class 和 Reducer Class 设计中的这段代码有一些疑问
我熟悉 MapReduce 中的 Map Side Joining,我了解到:
public static class CustsMapper extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
这里,在上面的代码片段中,我了解到我们将 class 扩展到 Mapper
class,因为 Object
是一个键,Text
是一个值,因此 map 方法将此键值作为输入,context
对象在这里帮助像这样作为输出 context.write(new Text(), new Text())
根据代码逻辑主体的设计。
我的两个问题是:
为什么我们将 class 扩展到
MapReduceBase
(它有什么作用?)以及为什么我们将 class 实现到Mapper
(我知道这是 class 但在 网络某处显示为界面,所以如果我将其扩展到org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
class有什么问题?在
map
函数中OutputCollector<Text, IntWritable> output, Reporter reporter
是什么?我不知道吗?我知道Context context
应该在这里,但什么是OutputCollector
和Reporter
这里?
我正在执行下面给出的程序:
输入:
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45
代码:
package hadoop;
import java.util.*;
import java.io.IOException;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class ProcessUnits
{
//Mapper class
public static class E_EMapper extends MapReduceBase implements Mapper<LongWritable ,Text,Text,IntWritable>
{
//Map function
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
{
String line = value.toString();
String lasttoken = null;
StringTokenizer s = new StringTokenizer(line,"\t");
String year = s.nextToken();
while(s.hasMoreTokens())
{
lasttoken=s.nextToken();
}
int avgprice = Integer.parseInt(lasttoken);
output.collect(new Text(year), new IntWritable(avgprice));
}
}
//Reducer class
public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable>
{
//Reduce function
public void reduce( Text key, Iterator <IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
{
int maxavg=30;
int val=Integer.MIN_VALUE;
while (values.hasNext())
{
if((val=values.next().get())>maxavg)
{
output.collect(key, new IntWritable(val));
}
}
}
}
//Main function
public static void main(String args[])throws Exception
{
JobConf conf = new JobConf(ProcessUnits.class);
conf.setJobName("max_eletricityunits");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(E_EMapper.class);
conf.setCombinerClass(E_EReduce.class);
conf.setReducerClass(E_EReduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
输出:
1981 34
1984 40
1985 45
Why we have extended our class to MapReduceBase(what it does?) and why we have implemented our class to Mapper
因为那是在 Hadoop 2.x 出现之前用 mapred API 编写的旧代码。
I knew that Context context should be here but what is OutputCollector and Reporter here
这是上下文对象的先前版本。
Hadoop: How does OutputCollector work during MapReduce?
How outputcollector works?