Mapreduce 自定义密钥不起作用
Mapreduce custom key not working
我是 Map Reduce 的新手,正在尝试解决一些问题,以便通过实施更好地学习。
背景:
我从 movielens.com 获得了一个数据集,其中包含各种电影的评分。我正在尝试计算电影的最大评分,并按评分计数降序对最终输出进行排序(输出的默认排序是按电影 ID)。我想要这样的东西:
movieId: rating_count(在rating_count上降序排列)
我在网上搜索了一下,发现我可以通过使用自定义密钥来实现。所以我正在尝试使用它但没有得到正确的结果。
在调试时,发现在 mapper 中一切正常,但问题出在 reducer 中。在 reducer 中,输入键始终是我文件中的最后一条记录,即映射器处理的最后一条记录,因此我得到错误的输出。
我附上我的 class供参考:
主要class:
public final class MovieLens_CustomSort {
public static class Map extends Mapper<LongWritable, Text, CompositeKey, IntWritable> {
private IntWritable one = new IntWritable(1);
private IntWritable movieId;
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String row = value.toString();
String splitRow[] = row.split("::");
CompositeKey compositeKey = new CompositeKey(Integer.valueOf(splitRow[1]), 1);
context.write(compositeKey, one);
}
}
public static class Reduce extends Reducer<CompositeKey, IntWritable, Text, IntWritable> {
@Override
protected void reduce(CompositeKey key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
Text outputKey = new Text(key.toString());
Iterator<IntWritable> iterator = values.iterator();
while (iterator.hasNext()) {
sum += iterator.next().get();
}
context.write(outputKey, new IntWritable(sum));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "max movie review");
job.setSortComparatorClass(CompositeKeyComparator.class);
job.setMapOutputKeyClass(CompositeKey.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
} }
自定义键:
public final class CompositeKey implements WritableComparable<CompositeKey> {
private int m_movieId;
private int m_count;
public CompositeKey() {
}
public CompositeKey(int movieId, int count) {
m_count = count;
m_movieId = movieId;
}
@Override
public int compareTo(CompositeKey o) {
return Integer.compare(o.getCount(), this.getCount());
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(m_movieId);
out.writeInt(m_count);
}
@Override
public void readFields(DataInput in) throws IOException {
m_movieId = in.readInt();
m_count = in.readInt();
}
public int getCount() {
return m_count;
}
public int getMovieId() {
return m_movieId;
}
@Override
public String toString() {
return "MovieId = " + m_movieId + " , count = " + m_count;
}}
自定义键比较器:
public class CompositeKeyComparator extends WritableComparator {
protected CompositeKeyComparator() {
super(CompositeKey.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
CompositeKey c1 = (CompositeKey)w1;
CompositeKey c2 = (CompositeKey)w2;
return Integer.compare(c2.getCount(), c1.getCount());
}}
P.S : 我知道我的密钥 class 没有多大意义,但这只是为了学习目的而创建的。
我已经解决了这个问题。问题出在 CompositeKeyComparator 中,我根据计数进行比较,映射器之后的每条记录都是 1,因此每条记录都呈现为相等。一旦我更改了与电影 ID 的比较,它工作正常。
我是 Map Reduce 的新手,正在尝试解决一些问题,以便通过实施更好地学习。
背景:
我从 movielens.com 获得了一个数据集,其中包含各种电影的评分。我正在尝试计算电影的最大评分,并按评分计数降序对最终输出进行排序(输出的默认排序是按电影 ID)。我想要这样的东西:
movieId: rating_count(在rating_count上降序排列)
我在网上搜索了一下,发现我可以通过使用自定义密钥来实现。所以我正在尝试使用它但没有得到正确的结果。
在调试时,发现在 mapper 中一切正常,但问题出在 reducer 中。在 reducer 中,输入键始终是我文件中的最后一条记录,即映射器处理的最后一条记录,因此我得到错误的输出。
我附上我的 class供参考:
主要class:
public final class MovieLens_CustomSort {
public static class Map extends Mapper<LongWritable, Text, CompositeKey, IntWritable> {
private IntWritable one = new IntWritable(1);
private IntWritable movieId;
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String row = value.toString();
String splitRow[] = row.split("::");
CompositeKey compositeKey = new CompositeKey(Integer.valueOf(splitRow[1]), 1);
context.write(compositeKey, one);
}
}
public static class Reduce extends Reducer<CompositeKey, IntWritable, Text, IntWritable> {
@Override
protected void reduce(CompositeKey key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
Text outputKey = new Text(key.toString());
Iterator<IntWritable> iterator = values.iterator();
while (iterator.hasNext()) {
sum += iterator.next().get();
}
context.write(outputKey, new IntWritable(sum));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "max movie review");
job.setSortComparatorClass(CompositeKeyComparator.class);
job.setMapOutputKeyClass(CompositeKey.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
} }
自定义键:
public final class CompositeKey implements WritableComparable<CompositeKey> {
private int m_movieId;
private int m_count;
public CompositeKey() {
}
public CompositeKey(int movieId, int count) {
m_count = count;
m_movieId = movieId;
}
@Override
public int compareTo(CompositeKey o) {
return Integer.compare(o.getCount(), this.getCount());
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(m_movieId);
out.writeInt(m_count);
}
@Override
public void readFields(DataInput in) throws IOException {
m_movieId = in.readInt();
m_count = in.readInt();
}
public int getCount() {
return m_count;
}
public int getMovieId() {
return m_movieId;
}
@Override
public String toString() {
return "MovieId = " + m_movieId + " , count = " + m_count;
}}
自定义键比较器:
public class CompositeKeyComparator extends WritableComparator {
protected CompositeKeyComparator() {
super(CompositeKey.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
CompositeKey c1 = (CompositeKey)w1;
CompositeKey c2 = (CompositeKey)w2;
return Integer.compare(c2.getCount(), c1.getCount());
}}
P.S : 我知道我的密钥 class 没有多大意义,但这只是为了学习目的而创建的。
我已经解决了这个问题。问题出在 CompositeKeyComparator 中,我根据计数进行比较,映射器之后的每条记录都是 1,因此每条记录都呈现为相等。一旦我更改了与电影 ID 的比较,它工作正常。