Mapreduce 自定义密钥不起作用

Mapreduce custom key not working

我是 Map Reduce 的新手,正在尝试解决一些问题,以便通过实施更好地学习。

背景:

我从 movielens.com 获得了一个数据集,其中包含各种电影的评分。我正在尝试计算电影的最大评分,并按评分计数降序对最终输出进行排序(输出的默认排序是按电影 ID)。我想要这样的东西:

movieId: rating_count(在rating_count上降序排列)

我在网上搜索了一下,发现我可以通过使用自定义密钥来实现。所以我正在尝试使用它但没有得到正确的结果。

在调试时,发现在 mapper 中一切正常,但问题出在 reducer 中。在 reducer 中,输入键始终是我文件中的最后一条记录,即映射器处理的最后一条记录,因此我得到错误的输出。

我附上我的 class供参考:

主要class:

public final class MovieLens_CustomSort {

public static class Map extends Mapper<LongWritable, Text, CompositeKey, IntWritable> {

    private IntWritable one = new IntWritable(1);
    private IntWritable movieId;

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String row = value.toString();
        String splitRow[] = row.split("::");
        CompositeKey compositeKey = new CompositeKey(Integer.valueOf(splitRow[1]), 1);
        context.write(compositeKey, one);
    }
}

public static class Reduce extends Reducer<CompositeKey, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(CompositeKey key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        Text outputKey = new Text(key.toString());

        Iterator<IntWritable> iterator = values.iterator();
        while (iterator.hasNext()) {
            sum += iterator.next().get();
        }
        context.write(outputKey, new IntWritable(sum));
    }
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "max movie review");

    job.setSortComparatorClass(CompositeKeyComparator.class);
    job.setMapOutputKeyClass(CompositeKey.class);
    job.setMapOutputValueClass(IntWritable.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
}  }

自定义键:

public final class CompositeKey implements WritableComparable<CompositeKey> {

private int m_movieId;
private int m_count;

public CompositeKey() {

}

public CompositeKey(int movieId, int count) {
    m_count = count;
    m_movieId = movieId;
}

@Override
public int compareTo(CompositeKey o) {
    return Integer.compare(o.getCount(), this.getCount());
}

@Override
public void write(DataOutput out) throws IOException {
    out.writeInt(m_movieId);
    out.writeInt(m_count);
}

@Override
public void readFields(DataInput in) throws IOException {
    m_movieId = in.readInt();
    m_count = in.readInt();
}

public int getCount() {
    return m_count;
}

public int getMovieId() {
    return m_movieId;
}

@Override
public String toString() {
    return "MovieId = " + m_movieId + " , count = " + m_count;
}}

自定义键比较器:

public class CompositeKeyComparator extends WritableComparator {

protected CompositeKeyComparator() {
    super(CompositeKey.class, true);
}

@Override
public int compare(WritableComparable w1, WritableComparable w2) {
    CompositeKey c1 = (CompositeKey)w1;
    CompositeKey c2 = (CompositeKey)w2;

    return Integer.compare(c2.getCount(), c1.getCount());
}}

P.S : 我知道我的密钥 class 没有多大意义,但这只是为了学习目的而创建的。

我已经解决了这个问题。问题出在 CompositeKeyComparator 中,我根据计数进行比较,映射器之后的每条记录都是 1,因此每条记录都呈现为相等。一旦我更改了与电影 ID 的比较,它工作正常。