Linux 上的 MapReduce Hadoop - 更改 Reduce 键
MapReduce Hadoop on Linux - Change Reduce Key
我一直在网上搜索有关如何使用 map 和 reduce 的正确教程,但几乎所有关于 WordCount 的代码都很糟糕,并没有真正向您解释如何使用每个函数。我已经了解了有关理论、键、地图等的所有内容,但是没有 CODE 例如做与 WordCount 不同的事情。
我在 Virtual Box 和 Hadoop 版本 3.2.1 上使用 Ubuntu 20.10(如果您需要更多信息,请评论我)。
我的任务是管理一个文件,其中包含参加奥运会的运动员的多项数据。
你会看到它包含各种信息,如姓名、性别、年龄、体重、身高等
我在这里举个例子(希望你能理解):
ID Name Sex Age Height Weight Team NOC Games Year Season City
Sport Event Medal
1 A Dijiang M 24 180 80 China CHN 1992 Summer 1992 Summer Barcelona
Basketball Basketball Men's Basketball NA
直到现在,我不得不处理与所有记录相同的数据,例如姓名或 ID,
彼此相似。
(想象一个参与者不止一次,这是我的问题
在不同的时间段,所以 reduce 无法将记录识别为相同的)
如果我可以将 reduce 函数的键/识别更改为参与者的名称,例如参与者,那么我应该得到正确的结果。
在此代码中,我搜索至少赢得奖牌的玩家。
我的主要是:
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class NewWordCount {
public static void main(String[] args) throws Exception {
if(args.length != 3) {
System.err.println("Give the correct arguments.");
System.exit(3);
}
// Job 1.
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "count");
job.setJarByClass(NewWordCount.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(NewWordMapper.class);
job.setCombinerClass(NewWordReducer.class);
job.setReducerClass(NewWordReducer.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.waitForCompletion(true);
}
}
我的映射器是:
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class NewWordMapper extends Mapper <LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable();
private Text word = new Text();
private String name = new String();
private String sex = new String();
private String age = new String();
private String team = new String();
private String sport = new String();
private String games = new String();
private String sum = new String();
private String gold = "Gold";
private String silver = "Silver";
private String bronze = "Bronze";
public void map(LongWritable key, Text value, Context context) throws IOException,InterruptedException {
if(((LongWritable)key).get() == 0) {
return;
}
String line = value.toString();
String[] arrOfStr = line.split(",");
int counter = 0;
for(String a : arrOfStr) {
if(counter == 14) {
// setting the type of medal each player has won.
word.set(a);
// checking if the medal is gold.
if(a.compareTo(gold) == 0 || a.compareTo(silver) == 0 || a.compareTo(bronze) == 0) {
String[] goldenStr = line.split(",");
name = goldenStr[1];
sex = goldenStr[2];
age = goldenStr[3];
team = goldenStr[6];
sport = goldenStr[12];
games = goldenStr[8];
sum = name + "," + sex + "," + age + "," + team + "," + sport + "," + games;
word.set(sum);
context.write(word, one);
}
}
counter++;
}
}
}
我的 Reducer 是:
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class NewWordReducer extends Reducer <Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for(IntWritable val : values) {
String line = val.toString();
String[] arrOfStr = line.split(",");
String name = arrOfStr[0];
count += val.get();
}
context.write(key, new IntWritable(count));
}
}
关于 MapReduce 作业的核心思想是 Map
函数用于从输入中提取有价值的信息并将其“转换”为 key-value
对,基于它们的 Reduce
函数将分别为每个键执行。您的代码似乎对后者的执行方式有误解,但这没什么大不了的,因为这是 WordCount 示例的正确示例。
假设我们有一个文件,其中包含奥林匹克运动员的统计数据和他们的奖牌表现,就像您在 HDFS 中名为 /olympic_stats
的目录下显示的那样(您看我包含了与此相同的运动员的记录示例需要处理):
1,A Dijiang,M,24,180,80,China,CHN,1992,Summer 1992,Summer,Barcelona,Basketball,Men's Basketball,NA
2,T Kekempourdas,M,33,189,85,Greece,GRE,2004,Summer 2004,Summer,Athens,Judo,Men's Judo,Gold
3,T Kekempourdas,M,33,189,85,Greece,GRE,2000,Summer 2000,Summer,Sydney,Judo,Men's Judo,Bronze
4,K Stefanidi,F,29,183,76,Greece,GRE,2016,Summer 2016,Summer,Rio,Pole Vault, Women's Pole Vault,Silver
5,A Jones,F,26,160,56,Canada,CAN,2012,Summer 2012,Summer,London,Acrobatics,Women's Acrobatics,Gold
5,A Jones,F,26,160,56,Canada,CAN,2016,Summer 2012,Summer,Rio,Acrobatics,Women's Acrobatics,Gold
6,C Glover,M,33,175,80,USA,USA,2008,Summer 2008,Summer,Beijing,Archery,Men's Archery,Gold
7,C Glover,M,33,175,80,USA,USA,2012,Summer 2012,Summer,London,Archery,Men's Archery,Gold
8,C Glover,M,33,175,80,USA,USA,2016,Summer 2016,Summer,Rio,Archery,Men's Archery,Gold
对于Map
函数,我们需要找到好用的一列数据作为key,以便计算每个运动员获得的金牌数。从上面我们可以很容易地看出,每个运动员都可以有一个或多个记录,并且他们的第二列都会有 his/her 姓名,因此我们确定我们将使用他们的 姓名 作为 key-value
对上的键。至于值,我们确实想计算 一名运动员获得了多少枚金牌 因此我们必须检查第 14 列,该列指示该运动员是否获得了奖牌以及获得了什么奖牌。如果此记录的列等于 String
Gold 那么我们可以确定该运动员至少有 1职业生涯至今的金牌。所以在这里,作为价值,我们可以把 1.
现在对于 Reduce
函数,因为它针对每个不同的键单独执行,我们可以理解它从映射器获得的输入值将用于同一个运动员。由于从映射器生成的 key-value
对在给定运动员的每块金牌的值中只有 1,我们可以将所有这些 1 相加并得到他们每个人的金牌总数。
所以它的代码如下所示(为了简单起见,我将映射器、减速器和驱动程序放在同一个文件中):
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.*;
import java.io.IOException;
import java.util.*;
import java.nio.charset.StandardCharsets;
public class GoldMedals
{
/* input: <byte_offset, line_of_dataset>
* output: <Athlete's Name, 1>
*/
public static class Map extends Mapper<Object, Text, Text, IntWritable>
{
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
String record = value.toString();
String[] columns = record.split(",");
// extract the athlete's name and his/hers medal indication
String athlete_name = columns[1];
String medal = columns[14];
// only hold the gold medal athletes, with their name as the key
// and 1 as the least number of gold medals they have so far
if(medal.equals("Gold"))
context.write(new Text(athlete_name), new IntWritable(1));
}
}
/* input: <Athlete's Name, 1>
* output: <Athlete's Name, Athlete's Total Gold Medals>
*/
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>
{
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum = 0;
// for a single athlete, add all of the gold medals they had so far...
for(IntWritable value : values)
sum += value.get();
// and write the result as the value on the output key-value pairs
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception
{
// set the paths of the input and output directories in the HDFS
Path input_dir = new Path("olympic_stats");
Path output_dir = new Path("gold_medals");
// in case the output directory already exists, delete it
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
if(fs.exists(output_dir))
fs.delete(output_dir, true);
// configure the MapReduce job
Job goldmedals_job = Job.getInstance(conf, "Gold Medals Counter");
goldmedals_job.setJarByClass(GoldMedals.class);
goldmedals_job.setMapperClass(Map.class);
goldmedals_job.setCombinerClass(Reduce.class);
goldmedals_job.setReducerClass(Reduce.class);
goldmedals_job.setMapOutputKeyClass(Text.class);
goldmedals_job.setMapOutputValueClass(IntWritable.class);
goldmedals_job.setOutputKeyClass(Text.class);
goldmedals_job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(goldmedals_job, input_dir);
FileOutputFormat.setOutputPath(goldmedals_job, output_dir);
goldmedals_job.waitForCompletion(true);
}
}
上述程序的输出存储在 HDFS 的 /olympic_stats_out
目录中,它具有以下输出并确认 MapReduce
作业设计正确:
我一直在网上搜索有关如何使用 map 和 reduce 的正确教程,但几乎所有关于 WordCount 的代码都很糟糕,并没有真正向您解释如何使用每个函数。我已经了解了有关理论、键、地图等的所有内容,但是没有 CODE 例如做与 WordCount 不同的事情。
我在 Virtual Box 和 Hadoop 版本 3.2.1 上使用 Ubuntu 20.10(如果您需要更多信息,请评论我)。
我的任务是管理一个文件,其中包含参加奥运会的运动员的多项数据。
你会看到它包含各种信息,如姓名、性别、年龄、体重、身高等
我在这里举个例子(希望你能理解):
ID Name Sex Age Height Weight Team NOC Games Year Season City
Sport Event Medal
1 A Dijiang M 24 180 80 China CHN 1992 Summer 1992 Summer Barcelona
Basketball Basketball Men's Basketball NA
直到现在,我不得不处理与所有记录相同的数据,例如姓名或 ID,
彼此相似。
(想象一个参与者不止一次,这是我的问题
在不同的时间段,所以 reduce 无法将记录识别为相同的)
如果我可以将 reduce 函数的键/识别更改为参与者的名称,例如参与者,那么我应该得到正确的结果。
在此代码中,我搜索至少赢得奖牌的玩家。
我的主要是:
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class NewWordCount {
public static void main(String[] args) throws Exception {
if(args.length != 3) {
System.err.println("Give the correct arguments.");
System.exit(3);
}
// Job 1.
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "count");
job.setJarByClass(NewWordCount.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(NewWordMapper.class);
job.setCombinerClass(NewWordReducer.class);
job.setReducerClass(NewWordReducer.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.waitForCompletion(true);
}
}
我的映射器是:
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class NewWordMapper extends Mapper <LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable();
private Text word = new Text();
private String name = new String();
private String sex = new String();
private String age = new String();
private String team = new String();
private String sport = new String();
private String games = new String();
private String sum = new String();
private String gold = "Gold";
private String silver = "Silver";
private String bronze = "Bronze";
public void map(LongWritable key, Text value, Context context) throws IOException,InterruptedException {
if(((LongWritable)key).get() == 0) {
return;
}
String line = value.toString();
String[] arrOfStr = line.split(",");
int counter = 0;
for(String a : arrOfStr) {
if(counter == 14) {
// setting the type of medal each player has won.
word.set(a);
// checking if the medal is gold.
if(a.compareTo(gold) == 0 || a.compareTo(silver) == 0 || a.compareTo(bronze) == 0) {
String[] goldenStr = line.split(",");
name = goldenStr[1];
sex = goldenStr[2];
age = goldenStr[3];
team = goldenStr[6];
sport = goldenStr[12];
games = goldenStr[8];
sum = name + "," + sex + "," + age + "," + team + "," + sport + "," + games;
word.set(sum);
context.write(word, one);
}
}
counter++;
}
}
}
我的 Reducer 是:
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class NewWordReducer extends Reducer <Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for(IntWritable val : values) {
String line = val.toString();
String[] arrOfStr = line.split(",");
String name = arrOfStr[0];
count += val.get();
}
context.write(key, new IntWritable(count));
}
}
关于 MapReduce 作业的核心思想是 Map
函数用于从输入中提取有价值的信息并将其“转换”为 key-value
对,基于它们的 Reduce
函数将分别为每个键执行。您的代码似乎对后者的执行方式有误解,但这没什么大不了的,因为这是 WordCount 示例的正确示例。
假设我们有一个文件,其中包含奥林匹克运动员的统计数据和他们的奖牌表现,就像您在 HDFS 中名为 /olympic_stats
的目录下显示的那样(您看我包含了与此相同的运动员的记录示例需要处理):
1,A Dijiang,M,24,180,80,China,CHN,1992,Summer 1992,Summer,Barcelona,Basketball,Men's Basketball,NA
2,T Kekempourdas,M,33,189,85,Greece,GRE,2004,Summer 2004,Summer,Athens,Judo,Men's Judo,Gold
3,T Kekempourdas,M,33,189,85,Greece,GRE,2000,Summer 2000,Summer,Sydney,Judo,Men's Judo,Bronze
4,K Stefanidi,F,29,183,76,Greece,GRE,2016,Summer 2016,Summer,Rio,Pole Vault, Women's Pole Vault,Silver
5,A Jones,F,26,160,56,Canada,CAN,2012,Summer 2012,Summer,London,Acrobatics,Women's Acrobatics,Gold
5,A Jones,F,26,160,56,Canada,CAN,2016,Summer 2012,Summer,Rio,Acrobatics,Women's Acrobatics,Gold
6,C Glover,M,33,175,80,USA,USA,2008,Summer 2008,Summer,Beijing,Archery,Men's Archery,Gold
7,C Glover,M,33,175,80,USA,USA,2012,Summer 2012,Summer,London,Archery,Men's Archery,Gold
8,C Glover,M,33,175,80,USA,USA,2016,Summer 2016,Summer,Rio,Archery,Men's Archery,Gold
对于Map
函数,我们需要找到好用的一列数据作为key,以便计算每个运动员获得的金牌数。从上面我们可以很容易地看出,每个运动员都可以有一个或多个记录,并且他们的第二列都会有 his/her 姓名,因此我们确定我们将使用他们的 姓名 作为 key-value
对上的键。至于值,我们确实想计算 一名运动员获得了多少枚金牌 因此我们必须检查第 14 列,该列指示该运动员是否获得了奖牌以及获得了什么奖牌。如果此记录的列等于 String
Gold 那么我们可以确定该运动员至少有 1职业生涯至今的金牌。所以在这里,作为价值,我们可以把 1.
现在对于 Reduce
函数,因为它针对每个不同的键单独执行,我们可以理解它从映射器获得的输入值将用于同一个运动员。由于从映射器生成的 key-value
对在给定运动员的每块金牌的值中只有 1,我们可以将所有这些 1 相加并得到他们每个人的金牌总数。
所以它的代码如下所示(为了简单起见,我将映射器、减速器和驱动程序放在同一个文件中):
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.*;
import java.io.IOException;
import java.util.*;
import java.nio.charset.StandardCharsets;
public class GoldMedals
{
/* input: <byte_offset, line_of_dataset>
* output: <Athlete's Name, 1>
*/
public static class Map extends Mapper<Object, Text, Text, IntWritable>
{
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
String record = value.toString();
String[] columns = record.split(",");
// extract the athlete's name and his/hers medal indication
String athlete_name = columns[1];
String medal = columns[14];
// only hold the gold medal athletes, with their name as the key
// and 1 as the least number of gold medals they have so far
if(medal.equals("Gold"))
context.write(new Text(athlete_name), new IntWritable(1));
}
}
/* input: <Athlete's Name, 1>
* output: <Athlete's Name, Athlete's Total Gold Medals>
*/
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>
{
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum = 0;
// for a single athlete, add all of the gold medals they had so far...
for(IntWritable value : values)
sum += value.get();
// and write the result as the value on the output key-value pairs
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception
{
// set the paths of the input and output directories in the HDFS
Path input_dir = new Path("olympic_stats");
Path output_dir = new Path("gold_medals");
// in case the output directory already exists, delete it
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
if(fs.exists(output_dir))
fs.delete(output_dir, true);
// configure the MapReduce job
Job goldmedals_job = Job.getInstance(conf, "Gold Medals Counter");
goldmedals_job.setJarByClass(GoldMedals.class);
goldmedals_job.setMapperClass(Map.class);
goldmedals_job.setCombinerClass(Reduce.class);
goldmedals_job.setReducerClass(Reduce.class);
goldmedals_job.setMapOutputKeyClass(Text.class);
goldmedals_job.setMapOutputValueClass(IntWritable.class);
goldmedals_job.setOutputKeyClass(Text.class);
goldmedals_job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(goldmedals_job, input_dir);
FileOutputFormat.setOutputPath(goldmedals_job, output_dir);
goldmedals_job.waitForCompletion(true);
}
}
上述程序的输出存储在 HDFS 的 /olympic_stats_out
目录中,它具有以下输出并确认 MapReduce
作业设计正确: