了解映射精简代码
Understanding Mapreduce Code
我正在尝试通过制作电影推荐系统来实践大数据Mapreduce。我的代码:
*imports
public class MRS {
public static class Map extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context con)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer token = new StringTokenizer(line);
while(token.hasMoreTokens()){
String userId = token.nextToken();
String movieId = token.nextToken();
String ratings =token.nextToken();
token.nextToken();
con.write(new Text(userId), new Text(movieId + "," + ratings));
}
}
}
public static class Reduce extends
Reducer<Text, IntWritable, Text, Text> {
public void reduce(Text key, Iterable<Text> value,Context con ) throws IOException, InterruptedException{
int item_count=0;
int item_sum =0;
String result="[";
for(Text t : value){
String s = t.toString();
StringTokenizer token = new StringTokenizer(s,",");
while(token.hasMoreTokens()){
token.nextToken();
item_sum=item_sum+Integer.parseInt(token.nextToken());
item_count++;
}
result=result+"("+s+"),";
}
result=result.substring(0, result.length()-1);
result=result+"]";
result=String.valueOf(item_count)+","+String.valueOf(item_sum)+","+result;
con.write(key, new Text(result));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration con = new Configuration();
Job job = new Job(con,"Movie Recommendation");
job.setJarByClass(MRS.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
我正在使用来自 here
的 movielens 数据集
其中输入文件是u.data
我在 运行 之后的输出应该是这样的
userId Item_count,Item_sum,[movie_Id 的列表,评分]
但是,我得到了这个
99 173,4
99 288,4
99 66,3
99 203,4
99 105,2
99 12,5
99 1,4
99 741,3
99 895,3
99 619,4
99 742,5
99 294,4
99 196,4
99 328,4
99 120,2
99 246,3
99 232,4
99 181,5
99 201,3
99 978,3
99 123,3
99 433,4
99 345,3
这应该是地图的输出class
我对代码做了一些调整,它给了我准确的预期结果。
这是我的新代码
进口*
public class MRS {
public static class Map extends
Mapper<LongWritable, Text, IntWritable, Text> {
public void map(LongWritable key, Text value, Context con)
throws IOException, InterruptedException {
String line = value.toString();
String[] s = line.split("\t");
StringTokenizer token = new StringTokenizer(line);
while (token.hasMoreTokens()) {
IntWritable userId = new IntWritable(Integer.parseInt(token
.nextToken()));
String movieId = token.nextToken();
String ratings = token.nextToken();
token.nextToken();
con.write(userId, new Text(movieId + "," + ratings));
}
}
}
public static class Reduce extends
Reducer<IntWritable, Text, IntWritable, Text> {
public void reduce(IntWritable key, Iterable<Text> value, Context con)
throws IOException, InterruptedException {
int item_count = 0;
int item_sum = 0;
String result = "";
for (Text t : value) {
String s = t.toString();
StringTokenizer token = new StringTokenizer(s, ",");
result = result + "[" + s + "],";
}
result = result.substring(1, result.length() - 2);
System.out.println(result);
con.write(key, new Text(result));
}
}
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration con = new Configuration();
Job job = new Job(con, "Movie Recommendation");
job.setJarByClass(MRS.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
我改变的是
Driver代码
job.setOutputKeyClass(IntWritable.class);
映射器代码
Mapper<LongWritable, Text, IntWritable, Text>
减速器代码
public static class Reduce extends
Reducer<Text, IntWritable, Text, Text> {
public void reduce(Text key, Iterable<Text> value,Context con ) throws
IOException, InterruptedException{
我认为问题在于 outputkey 和 outputvalue 数据与 mapper 匹配 class 这就是为什么它正在打印 mapper 甚至不执行 reducer
如有错误请指正
我正在尝试通过制作电影推荐系统来实践大数据Mapreduce。我的代码:
*imports
public class MRS {
public static class Map extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context con)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer token = new StringTokenizer(line);
while(token.hasMoreTokens()){
String userId = token.nextToken();
String movieId = token.nextToken();
String ratings =token.nextToken();
token.nextToken();
con.write(new Text(userId), new Text(movieId + "," + ratings));
}
}
}
public static class Reduce extends
Reducer<Text, IntWritable, Text, Text> {
public void reduce(Text key, Iterable<Text> value,Context con ) throws IOException, InterruptedException{
int item_count=0;
int item_sum =0;
String result="[";
for(Text t : value){
String s = t.toString();
StringTokenizer token = new StringTokenizer(s,",");
while(token.hasMoreTokens()){
token.nextToken();
item_sum=item_sum+Integer.parseInt(token.nextToken());
item_count++;
}
result=result+"("+s+"),";
}
result=result.substring(0, result.length()-1);
result=result+"]";
result=String.valueOf(item_count)+","+String.valueOf(item_sum)+","+result;
con.write(key, new Text(result));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration con = new Configuration();
Job job = new Job(con,"Movie Recommendation");
job.setJarByClass(MRS.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
我正在使用来自 here
的 movielens 数据集其中输入文件是u.data
我在 运行 之后的输出应该是这样的
userId Item_count,Item_sum,[movie_Id 的列表,评分]
但是,我得到了这个
99 173,4
99 288,4
99 66,3
99 203,4
99 105,2
99 12,5
99 1,4
99 741,3
99 895,3
99 619,4
99 742,5
99 294,4
99 196,4
99 328,4
99 120,2
99 246,3
99 232,4
99 181,5
99 201,3
99 978,3
99 123,3
99 433,4
99 345,3
这应该是地图的输出class
我对代码做了一些调整,它给了我准确的预期结果。 这是我的新代码
进口*
public class MRS {
public static class Map extends
Mapper<LongWritable, Text, IntWritable, Text> {
public void map(LongWritable key, Text value, Context con)
throws IOException, InterruptedException {
String line = value.toString();
String[] s = line.split("\t");
StringTokenizer token = new StringTokenizer(line);
while (token.hasMoreTokens()) {
IntWritable userId = new IntWritable(Integer.parseInt(token
.nextToken()));
String movieId = token.nextToken();
String ratings = token.nextToken();
token.nextToken();
con.write(userId, new Text(movieId + "," + ratings));
}
}
}
public static class Reduce extends
Reducer<IntWritable, Text, IntWritable, Text> {
public void reduce(IntWritable key, Iterable<Text> value, Context con)
throws IOException, InterruptedException {
int item_count = 0;
int item_sum = 0;
String result = "";
for (Text t : value) {
String s = t.toString();
StringTokenizer token = new StringTokenizer(s, ",");
result = result + "[" + s + "],";
}
result = result.substring(1, result.length() - 2);
System.out.println(result);
con.write(key, new Text(result));
}
}
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration con = new Configuration();
Job job = new Job(con, "Movie Recommendation");
job.setJarByClass(MRS.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
我改变的是 Driver代码
job.setOutputKeyClass(IntWritable.class);
映射器代码
Mapper<LongWritable, Text, IntWritable, Text>
减速器代码
public static class Reduce extends
Reducer<Text, IntWritable, Text, Text> {
public void reduce(Text key, Iterable<Text> value,Context con ) throws
IOException, InterruptedException{
我认为问题在于 outputkey 和 outputvalue 数据与 mapper 匹配 class 这就是为什么它正在打印 mapper 甚至不执行 reducer
如有错误请指正