了解映射精简代码

Understanding Mapreduce Code

我正在尝试通过制作电影推荐系统来实践大数据Mapreduce。我的代码:

*imports



public class MRS {
    public static class Map extends Mapper<LongWritable, Text, Text, Text> {
        public void map(LongWritable key, Text value, Context con)
                throws IOException, InterruptedException {
            String line = value.toString();

            StringTokenizer token = new StringTokenizer(line);

        while(token.hasMoreTokens()){
            String userId = token.nextToken();
            String movieId = token.nextToken();
            String ratings =token.nextToken();
            token.nextToken();
            con.write(new Text(userId), new Text(movieId + "," + ratings));
        }

    }
}

public static class Reduce extends
        Reducer<Text, IntWritable, Text, Text> {
    public void reduce(Text key, Iterable<Text> value,Context con ) throws IOException, InterruptedException{
        int item_count=0;
        int item_sum =0;
        String result="[";
        for(Text t : value){
            String s = t.toString();
            StringTokenizer token = new StringTokenizer(s,",");
            while(token.hasMoreTokens()){
            token.nextToken();
            item_sum=item_sum+Integer.parseInt(token.nextToken());
            item_count++;
            }
            result=result+"("+s+"),";


        }
        result=result.substring(0, result.length()-1);
        result=result+"]";
        result=String.valueOf(item_count)+","+String.valueOf(item_sum)+","+result;

        con.write(key, new Text(result));
    }
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    Configuration con = new Configuration();
    Job job = new Job(con,"Movie Recommendation");

    job.setJarByClass(MRS.class);


    job.setMapperClass(Map.class);
    job.setCombinerClass(Reduce.class);
    job.setReducerClass(Reduce.class);


    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);


    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);


    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));


    System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

我正在使用来自 here

的 movielens 数据集

其中输入文件是u.data

我在 运行 之后的输出应该是这样的

userId Item_count,Item_sum,[movie_Id 的列表,评分]

但是,我得到了这个

99  173,4
99  288,4
99  66,3
99  203,4
99  105,2
99  12,5
99  1,4
99  741,3
99  895,3
99  619,4
99  742,5
99  294,4
99  196,4
99  328,4
99  120,2
99  246,3
99  232,4
99  181,5
99  201,3
99  978,3
99  123,3
99  433,4
99  345,3

这应该是地图的输出class

我对代码做了一些调整,它给了我准确的预期结果。 这是我的新代码

进口*

public class MRS {
public static class Map extends
        Mapper<LongWritable, Text, IntWritable, Text> {
    public void map(LongWritable key, Text value, Context con)
            throws IOException, InterruptedException {
        String line = value.toString();
        String[] s = line.split("\t");
        StringTokenizer token = new StringTokenizer(line);

        while (token.hasMoreTokens()) {
            IntWritable userId = new IntWritable(Integer.parseInt(token
                    .nextToken()));
            String movieId = token.nextToken();
            String ratings = token.nextToken();
            token.nextToken();
            con.write(userId, new Text(movieId + "," + ratings));
        }

    }
}

public static class Reduce extends
        Reducer<IntWritable, Text, IntWritable, Text> {
    public void reduce(IntWritable key, Iterable<Text> value, Context con)
            throws IOException, InterruptedException {
        int item_count = 0;
        int item_sum = 0;
        String result = "";
        for (Text t : value) {
            String s = t.toString();
            StringTokenizer token = new StringTokenizer(s, ",");

            result = result + "[" + s + "],";

        }
        result = result.substring(1, result.length() - 2);

        System.out.println(result);
        con.write(key, new Text(result));
    }
}

public static void main(String[] args) throws IOException,
        ClassNotFoundException, InterruptedException {
    Configuration con = new Configuration();
    Job job = new Job(con, "Movie Recommendation");

    job.setJarByClass(MRS.class);

    job.setMapperClass(Map.class);
    job.setCombinerClass(Reduce.class);
    job.setReducerClass(Reduce.class);

    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

我改变的是 Driver代码

job.setOutputKeyClass(IntWritable.class);

映射器代码

 Mapper<LongWritable, Text, IntWritable, Text>

减速器代码

 public static class Reduce extends
    Reducer<Text, IntWritable, Text, Text> {
    public void reduce(Text key, Iterable<Text> value,Context con ) throws 
 IOException, InterruptedException{

我认为问题在于 outputkey 和 outputvalue 数据与 mapper 匹配 class 这就是为什么它正在打印 mapper 甚至不执行 reducer

如有错误请指正