MapReduce 堆排序
MapReduce sorting with heap
我正在尝试分析包含 follower
和 followee
对的社交网络数据。我想找到使用 MapReduce 的关注者最多的 前 10 位用户。
我用一个 MapReduce 步骤制作了一对 userID
和 number_of_followee
。
但是,对于这些数据,我不确定如何在分布式系统中对它们进行排序。
我不确定如何在 Mappers 和 Reducers 中使用 priority queue
,因为它们具有分布式数据。
有人能解释一下如何使用数据结构对海量数据进行排序吗?
非常感谢。
如果您有格式为 user_id = number_of_followers
的大输入文件(多个文件),用于查找顶级 N
用户的简单 map-reduce 算法是:
- 每个映射器处理自己的输入并在其文件中找到前 N 个用户,将它们写入单个缩减器
- 单个 reducer 接收
number_of_mappers * N
行并在其中找到前 N 个用户
要按降序对数据进行排序,您需要另一个 mapreduce
作业。映射器将发出 "number of followers" 作为键和 twitter 句柄作为值。
class SortingMap extends Map<LongWritable, Text, LongWritable, Text> {
private Text value = new Text();
private LongWritable key = new LongWritable(0);
@Overwrite
public void map(LongWritable key, Text value, Context context) throws IOException {
String line = value.toString();
// Assuming that the input data is "TweeterId <number of follower>" separated by tab
String tokens[] = value.split(Pattern.quote("\t"));
if(tokens.length > 1) {
key.set(Long.parseLong(tokens[1]));
value.set(tokens[0]);
context.write(key, value);
}
}
}
对于reducer,使用IdentityReducer<K,V>
// SortedComparator Class
public class DescendingOrderKeyComparator extends WritableComparator {
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
return -1 * w1.compareTo(w2);
}
}
在驱动程序 Class 中,设置 SortedComparator
job.setSortComparatorClass(DescendingOrderKeyComparator.class);
我正在尝试分析包含 follower
和 followee
对的社交网络数据。我想找到使用 MapReduce 的关注者最多的 前 10 位用户。
我用一个 MapReduce 步骤制作了一对 userID
和 number_of_followee
。
但是,对于这些数据,我不确定如何在分布式系统中对它们进行排序。
我不确定如何在 Mappers 和 Reducers 中使用 priority queue
,因为它们具有分布式数据。
有人能解释一下如何使用数据结构对海量数据进行排序吗?
非常感谢。
如果您有格式为 user_id = number_of_followers
的大输入文件(多个文件),用于查找顶级 N
用户的简单 map-reduce 算法是:
- 每个映射器处理自己的输入并在其文件中找到前 N 个用户,将它们写入单个缩减器
- 单个 reducer 接收
number_of_mappers * N
行并在其中找到前 N 个用户
要按降序对数据进行排序,您需要另一个 mapreduce
作业。映射器将发出 "number of followers" 作为键和 twitter 句柄作为值。
class SortingMap extends Map<LongWritable, Text, LongWritable, Text> {
private Text value = new Text();
private LongWritable key = new LongWritable(0);
@Overwrite
public void map(LongWritable key, Text value, Context context) throws IOException {
String line = value.toString();
// Assuming that the input data is "TweeterId <number of follower>" separated by tab
String tokens[] = value.split(Pattern.quote("\t"));
if(tokens.length > 1) {
key.set(Long.parseLong(tokens[1]));
value.set(tokens[0]);
context.write(key, value);
}
}
}
对于reducer,使用IdentityReducer<K,V>
// SortedComparator Class
public class DescendingOrderKeyComparator extends WritableComparator {
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
return -1 * w1.compareTo(w2);
}
}
在驱动程序 Class 中,设置 SortedComparator
job.setSortComparatorClass(DescendingOrderKeyComparator.class);