Linux 上的 MapReduce Hadoop - 输入多个数据
MapReduce Hadoop on Linux - Multiple data on input
我在 Virtual Box 和 Hadoop 版本 3.2.1 上使用 Ubuntu 20.10(如果您需要更多信息,请评论我)。
我此时的输出给了我这个:
Aaron Wells Peirsol ,M,17,United States,Swimming,2000 Summer,0,1,0
Aaron Wells Peirsol ,M,21,United States,Swimming,2004 Summer,1,0,0
Aaron Wells Peirsol ,M,25,United States,Swimming,2008 Summer,0,1,0
Aaron Wells Peirsol ,M,25,United States,Swimming,2008 Summer,1,0,0
对于上面的输出,我希望能够总结他所有的奖牌
(字符串末尾的三个数字分别代表金、银、铜
参赛者历年在奥运会上获得的奖牌)。
该项目没有指定年龄 (17,21,25,25)
或者当它发生时(2000,2004,2008,2008 夏天),但我必须添加奖牌
为了能够按获得最多金牌的参与者等对它们进行排序
有什么想法吗?如果您需要,我可以为您提供我的代码,但我需要另一个 MapReduce,我想它将使用我在上面导入的给定输入并给我们类似的东西:
Aaron Wells Peirsol,M,25,United States,Swimming,2008 Summer,2,2,0
如果我们有办法从 reduce 输出中删除“\t”,那也会非常有益!
Gyftonikolos Nikolaos,谢谢大家的宝贵时间。
虽然一开始看起来有点棘手,但这是 WordCount 示例的另一个例子,只是这次需要复合键和值,以便以 key-value
对。
对于映射器,我们需要从输入文件的每一行中提取所有信息,并将列中的数据分为两个“类别”:
key
每位运动员的主要信息始终相同
- 逐行更改且需要编辑的统计信息
对于每个运动员的行,我们知道永远不会改变的列是运动员的姓名、性别、国家和运动项目。通过使用 ,
字符作为每种数据类型之间的分隔符,所有这些都将被视为 key
。其余列数据将放在 key-value
对的值侧,但我们也需要在它们上面使用分隔符,以便首先区分每个年龄和奥运会年份的奖牌计数器。我们将使用:
@
字符作为年龄和年份之间的分隔符,
#
字符作为奖牌计数器之间的分隔符,
- 和
_
字符作为这两者之间的分隔符
在 Reduce
函数中,我们实际上要做的就是计算奖牌总数,并找到每位运动员的最新年龄和年份。
为了不在MapReduce作业输出的键和值之间有制表符,我们可以简单地将NULL
设置为key-value
对由 reducer 生成,并将计算的所有数据放在每对的值中,使用 ,
字符作为分隔符。
此作业的代码如下所示:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.*;
import java.io.IOException;
import java.util.*;
import java.nio.charset.StandardCharsets;
public class Medals
{
/* input: <byte_offset, line_of_dataset>
* output: <(name,sex,country,sport), (age@year_gold#silver#bronze)>
*/
public static class Map extends Mapper<Object, Text, Text, Text>
{
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
String record = value.toString();
String[] columns = record.split(",");
// extract athlete's main info
String name = columns[0];
String sex = columns[1];
String country = columns[3];
String sport = columns[4];
// extract athlete's stat info
String age = columns[2];
String year = columns[5];
String gold = columns[6];
String silver = columns[7];
String bronze = columns[8];
// set the main info as key and the stat info as value
context.write(new Text(name + "," + sex + "," + country + "," + sport), new Text(age + "@" + year + "_" + gold + "#" + silver + "#" + bronze));
}
}
/* input: <(name,sex,country,sport), (age@year_gold#silver#bronze)>
* output: <(NULL, (name,sex,age,country,sport,year,golds,silvers,bronzes)>
*/
public static class Reduce extends Reducer<Text, Text, NullWritable, Text>
{
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{
// extract athlete's main info
String[] athlete_info = key.toString().split(",");
String name = athlete_info[0];
String sex = athlete_info[1];
String country = athlete_info[2];
String sport = athlete_info[3];
int latest_age = 0;
String latest_games = "";
int gold_cnt = 0;
int silver_cnt = 0;
int bronze_cnt = 0;
// for a single athlete, compute their stats...
for(Text value : values)
{
String[] split_value = value.toString().split("_");
String[] age_and_year = split_value[0].split("@");
String[] medals = split_value[1].split("#");
// find the last age and games the athlete has stats in the input file
if(Integer.parseInt(age_and_year[0]) > latest_age)
{
latest_age = Integer.parseInt(age_and_year[0]);
latest_games = age_and_year[1];
}
if(Integer.parseInt(medals[0]) == 1)
gold_cnt++;
if(Integer.parseInt(medals[1]) == 1)
silver_cnt++;
if(Integer.parseInt(medals[2]) == 1)
bronze_cnt++;
}
context.write(NullWritable.get(), new Text(name + "," + sex + "," + String.valueOf(latest_age) + "," + country + "," + sport + "," + latest_games + "," + String.valueOf(gold_cnt) + "," + String.valueOf(silver_cnt) + "," + String.valueOf(bronze_cnt)));
}
}
public static void main(String[] args) throws Exception
{
// set the paths of the input and output directories in the HDFS
Path input_dir = new Path("olympic_stats");
Path output_dir = new Path("medals");
// in case the output directory already exists, delete it
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
if(fs.exists(output_dir))
fs.delete(output_dir, true);
// configure the MapReduce job
Job medals_job = Job.getInstance(conf, "Medals Counter");
medals_job.setJarByClass(Medals.class);
medals_job.setMapperClass(Map.class);
medals_job.setReducerClass(Reduce.class);
medals_job.setMapOutputKeyClass(Text.class);
medals_job.setMapOutputValueClass(Text.class);
medals_job.setOutputKeyClass(NullWritable.class);
medals_job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(medals_job, input_dir);
FileOutputFormat.setOutputPath(medals_job, output_dir);
medals_job.waitForCompletion(true);
}
}
当然,结果就是您希望的结果,如下所示:
我在 Virtual Box 和 Hadoop 版本 3.2.1 上使用 Ubuntu 20.10(如果您需要更多信息,请评论我)。
我此时的输出给了我这个:
Aaron Wells Peirsol ,M,17,United States,Swimming,2000 Summer,0,1,0
Aaron Wells Peirsol ,M,21,United States,Swimming,2004 Summer,1,0,0
Aaron Wells Peirsol ,M,25,United States,Swimming,2008 Summer,0,1,0
Aaron Wells Peirsol ,M,25,United States,Swimming,2008 Summer,1,0,0
对于上面的输出,我希望能够总结他所有的奖牌
(字符串末尾的三个数字分别代表金、银、铜
参赛者历年在奥运会上获得的奖牌)。
该项目没有指定年龄 (17,21,25,25)
或者当它发生时(2000,2004,2008,2008 夏天),但我必须添加奖牌
为了能够按获得最多金牌的参与者等对它们进行排序
有什么想法吗?如果您需要,我可以为您提供我的代码,但我需要另一个 MapReduce,我想它将使用我在上面导入的给定输入并给我们类似的东西:
Aaron Wells Peirsol,M,25,United States,Swimming,2008 Summer,2,2,0
如果我们有办法从 reduce 输出中删除“\t”,那也会非常有益!
Gyftonikolos Nikolaos,谢谢大家的宝贵时间。
虽然一开始看起来有点棘手,但这是 WordCount 示例的另一个例子,只是这次需要复合键和值,以便以 key-value
对。
对于映射器,我们需要从输入文件的每一行中提取所有信息,并将列中的数据分为两个“类别”:
key
每位运动员的主要信息始终相同
- 逐行更改且需要编辑的统计信息
对于每个运动员的行,我们知道永远不会改变的列是运动员的姓名、性别、国家和运动项目。通过使用 ,
字符作为每种数据类型之间的分隔符,所有这些都将被视为 key
。其余列数据将放在 key-value
对的值侧,但我们也需要在它们上面使用分隔符,以便首先区分每个年龄和奥运会年份的奖牌计数器。我们将使用:
@
字符作为年龄和年份之间的分隔符,#
字符作为奖牌计数器之间的分隔符,- 和
_
字符作为这两者之间的分隔符
在 Reduce
函数中,我们实际上要做的就是计算奖牌总数,并找到每位运动员的最新年龄和年份。
为了不在MapReduce作业输出的键和值之间有制表符,我们可以简单地将NULL
设置为key-value
对由 reducer 生成,并将计算的所有数据放在每对的值中,使用 ,
字符作为分隔符。
此作业的代码如下所示:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.*;
import java.io.IOException;
import java.util.*;
import java.nio.charset.StandardCharsets;
public class Medals
{
/* input: <byte_offset, line_of_dataset>
* output: <(name,sex,country,sport), (age@year_gold#silver#bronze)>
*/
public static class Map extends Mapper<Object, Text, Text, Text>
{
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
String record = value.toString();
String[] columns = record.split(",");
// extract athlete's main info
String name = columns[0];
String sex = columns[1];
String country = columns[3];
String sport = columns[4];
// extract athlete's stat info
String age = columns[2];
String year = columns[5];
String gold = columns[6];
String silver = columns[7];
String bronze = columns[8];
// set the main info as key and the stat info as value
context.write(new Text(name + "," + sex + "," + country + "," + sport), new Text(age + "@" + year + "_" + gold + "#" + silver + "#" + bronze));
}
}
/* input: <(name,sex,country,sport), (age@year_gold#silver#bronze)>
* output: <(NULL, (name,sex,age,country,sport,year,golds,silvers,bronzes)>
*/
public static class Reduce extends Reducer<Text, Text, NullWritable, Text>
{
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{
// extract athlete's main info
String[] athlete_info = key.toString().split(",");
String name = athlete_info[0];
String sex = athlete_info[1];
String country = athlete_info[2];
String sport = athlete_info[3];
int latest_age = 0;
String latest_games = "";
int gold_cnt = 0;
int silver_cnt = 0;
int bronze_cnt = 0;
// for a single athlete, compute their stats...
for(Text value : values)
{
String[] split_value = value.toString().split("_");
String[] age_and_year = split_value[0].split("@");
String[] medals = split_value[1].split("#");
// find the last age and games the athlete has stats in the input file
if(Integer.parseInt(age_and_year[0]) > latest_age)
{
latest_age = Integer.parseInt(age_and_year[0]);
latest_games = age_and_year[1];
}
if(Integer.parseInt(medals[0]) == 1)
gold_cnt++;
if(Integer.parseInt(medals[1]) == 1)
silver_cnt++;
if(Integer.parseInt(medals[2]) == 1)
bronze_cnt++;
}
context.write(NullWritable.get(), new Text(name + "," + sex + "," + String.valueOf(latest_age) + "," + country + "," + sport + "," + latest_games + "," + String.valueOf(gold_cnt) + "," + String.valueOf(silver_cnt) + "," + String.valueOf(bronze_cnt)));
}
}
public static void main(String[] args) throws Exception
{
// set the paths of the input and output directories in the HDFS
Path input_dir = new Path("olympic_stats");
Path output_dir = new Path("medals");
// in case the output directory already exists, delete it
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
if(fs.exists(output_dir))
fs.delete(output_dir, true);
// configure the MapReduce job
Job medals_job = Job.getInstance(conf, "Medals Counter");
medals_job.setJarByClass(Medals.class);
medals_job.setMapperClass(Map.class);
medals_job.setReducerClass(Reduce.class);
medals_job.setMapOutputKeyClass(Text.class);
medals_job.setMapOutputValueClass(Text.class);
medals_job.setOutputKeyClass(NullWritable.class);
medals_job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(medals_job, input_dir);
FileOutputFormat.setOutputPath(medals_job, output_dir);
medals_job.waitForCompletion(true);
}
}
当然,结果就是您希望的结果,如下所示: