Java8 中的 MapReduce 输出排序
MapReduce output ordering in Java8
我尝试使用解决方案对 Hadoop
中的减速器输出进行排序,如这个问题所述:
MapReduce sort by value in descending order
这个和 Java8 有一些冲突,所以我解决了它们:
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.util.Map;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.LinkedHashMap;
import java.util.Collections;
import java.util.List;
import java.util.Comparator;
public class HourlyTweetsReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public Map<String , Integer> map = new LinkedHashMap<String , Integer>();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
map.put(key.toString() , sum);
result.set(sum);
context.write(key, result);
}
public void cleanup(Context context){
//Cleanup is called once at the end to finish off anything for reducer
//Here we will write our final output
Map<String , Integer> sortedMap = new HashMap<String , Integer>();
sortedMap = sortMap(map);
for (Map.Entry<String,Integer> entry : sortedMap.entrySet()){
context.write(new Text(entry.getKey()),new IntWritable(entry.getValue()));
}
}
public Map<String , Integer > sortMap (Map<String,Integer> unsortMap){
Map<String ,Integer> hashmap = new HashMap<String,Integer>();
int count=0;
List<Map.Entry<String,Integer>> list = new LinkedList<Map.Entry<String,Integer>>(unsortMap.entrySet());
//Sorting the list we created from unsorted Map
Collections.sort(list , new Comparator<Map.Entry<String,Integer>>(){
public int compare (Map.Entry<String , Integer> o1 , Map.Entry<String , Integer> o2 ){
//sorting in descending order
return o2.getValue().compareTo(o1.getValue());
}
});
for(Map.Entry<String, Integer> entry : list){
// only writing top 3 in the sorted map
// if(count>2)
// break;
hashmap.put(entry.getKey(),entry.getValue());
}
return hashmap ;
}
}
问题是输出没有在 运行 作业之后排序:
11 1041557
14 1304166
17 1434978
2 733462
20 1288767
23 1677571
5 460629
8 497403
11 1041557
23 1677571
2 733462
14 1304166
5 460629
17 1434978
8 497403
20 1288767
如何解决?
我在这里看到两个选项:
- 只使用一个减速器。这就要求所有的数据都能装在一台机器的内存中。然后,单个reducer的输入会按照key(你想要的)的顺序进行排序。
- 使用TotalOrderPartitioner
https://hadoop.apache.org/docs/r2.4.1/api/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.html
它在 MapReduce 管道中强制执行一个附加阶段,以将元素划分到排序的桶中。
这是一个示例(不是我的),说明如何使用 TotalOrderPartioner:https://gist.github.com/asimjalis/e5627dc2ff2b23dac70b
我不打算判断此代码是否需要额外的步骤来确保在 Hadoop 的上下文中的正确性 Map/Reduce。
但是一个明显的错误是 sortMap
的开头有一行
Map<String ,Integer> hashmap = new HashMap<String,Integer>();
这会创建一个不维护任何特定顺序的地图,因此按排序顺序填充它没有任何效果。它应该是 LinkedHashMap
,如链接的问答代码中所示。
请注意,这与调用者创建的地图无关:
Map<String , Integer> sortedMap = new HashMap<String , Integer>();
sortedMap = sortMap(map);
这里,对创建地图的引用被 sortMap
的结果覆盖,因此地图实例完全过时了。但是,由于您要做的只是在已排序的映射上迭代一次以执行一个操作,因此您根本不需要将已排序的列表复制到结果 Map
,因为您已经可以执行该操作了遍历列表:
public void cleanup(Context context) {
//Cleanup is called once at the end to finish off anything for reducer
//Here we will write our final output
List<Map.Entry<String,Integer>> list = new ArrayList<>(map.entrySet());
Collections.sort(list, Map.Entry.comparingByValue(Comparator.reverseOrder()));
for(Map.Entry<String,Integer> entry: list) {
context.write(new Text(entry.getKey()), new IntWritable(entry.getValue()));
}
}
这使用了 Java 8 的 Map.Entry.comparingByValue(Comparator.reverseOrder())
内置比较器。如果需要 Java 7 兼容性,请使用问题中显示的比较器代码,
new Comparator<Map.Entry<String, Integer>>() {
public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
//sorting in descending order
return o2.getValue().compareTo(o1.getValue());
}
}
请注意,此代码使用 ArrayList
而不是 LinkedList
,因为您将对其执行所有三个操作,1) 使用地图条目集的内容对其进行初始化,2)就地排序和 3) 迭代它,ArrayList
的工作速度大大加快。对于 Java 8.
中的步骤 2) 尤其如此
我尝试使用解决方案对 Hadoop
中的减速器输出进行排序,如这个问题所述:
MapReduce sort by value in descending order
这个和 Java8 有一些冲突,所以我解决了它们:
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.util.Map;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.LinkedHashMap;
import java.util.Collections;
import java.util.List;
import java.util.Comparator;
public class HourlyTweetsReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public Map<String , Integer> map = new LinkedHashMap<String , Integer>();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
map.put(key.toString() , sum);
result.set(sum);
context.write(key, result);
}
public void cleanup(Context context){
//Cleanup is called once at the end to finish off anything for reducer
//Here we will write our final output
Map<String , Integer> sortedMap = new HashMap<String , Integer>();
sortedMap = sortMap(map);
for (Map.Entry<String,Integer> entry : sortedMap.entrySet()){
context.write(new Text(entry.getKey()),new IntWritable(entry.getValue()));
}
}
public Map<String , Integer > sortMap (Map<String,Integer> unsortMap){
Map<String ,Integer> hashmap = new HashMap<String,Integer>();
int count=0;
List<Map.Entry<String,Integer>> list = new LinkedList<Map.Entry<String,Integer>>(unsortMap.entrySet());
//Sorting the list we created from unsorted Map
Collections.sort(list , new Comparator<Map.Entry<String,Integer>>(){
public int compare (Map.Entry<String , Integer> o1 , Map.Entry<String , Integer> o2 ){
//sorting in descending order
return o2.getValue().compareTo(o1.getValue());
}
});
for(Map.Entry<String, Integer> entry : list){
// only writing top 3 in the sorted map
// if(count>2)
// break;
hashmap.put(entry.getKey(),entry.getValue());
}
return hashmap ;
}
}
问题是输出没有在 运行 作业之后排序:
11 1041557
14 1304166
17 1434978
2 733462
20 1288767
23 1677571
5 460629
8 497403
11 1041557
23 1677571
2 733462
14 1304166
5 460629
17 1434978
8 497403
20 1288767
如何解决?
我在这里看到两个选项:
- 只使用一个减速器。这就要求所有的数据都能装在一台机器的内存中。然后,单个reducer的输入会按照key(你想要的)的顺序进行排序。
- 使用TotalOrderPartitioner https://hadoop.apache.org/docs/r2.4.1/api/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.html
它在 MapReduce 管道中强制执行一个附加阶段,以将元素划分到排序的桶中。
这是一个示例(不是我的),说明如何使用 TotalOrderPartioner:https://gist.github.com/asimjalis/e5627dc2ff2b23dac70b
我不打算判断此代码是否需要额外的步骤来确保在 Hadoop 的上下文中的正确性 Map/Reduce。
但是一个明显的错误是 sortMap
的开头有一行
Map<String ,Integer> hashmap = new HashMap<String,Integer>();
这会创建一个不维护任何特定顺序的地图,因此按排序顺序填充它没有任何效果。它应该是 LinkedHashMap
,如链接的问答代码中所示。
请注意,这与调用者创建的地图无关:
Map<String , Integer> sortedMap = new HashMap<String , Integer>();
sortedMap = sortMap(map);
这里,对创建地图的引用被 sortMap
的结果覆盖,因此地图实例完全过时了。但是,由于您要做的只是在已排序的映射上迭代一次以执行一个操作,因此您根本不需要将已排序的列表复制到结果 Map
,因为您已经可以执行该操作了遍历列表:
public void cleanup(Context context) {
//Cleanup is called once at the end to finish off anything for reducer
//Here we will write our final output
List<Map.Entry<String,Integer>> list = new ArrayList<>(map.entrySet());
Collections.sort(list, Map.Entry.comparingByValue(Comparator.reverseOrder()));
for(Map.Entry<String,Integer> entry: list) {
context.write(new Text(entry.getKey()), new IntWritable(entry.getValue()));
}
}
这使用了 Java 8 的 Map.Entry.comparingByValue(Comparator.reverseOrder())
内置比较器。如果需要 Java 7 兼容性,请使用问题中显示的比较器代码,
new Comparator<Map.Entry<String, Integer>>() {
public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
//sorting in descending order
return o2.getValue().compareTo(o1.getValue());
}
}
请注意,此代码使用 ArrayList
而不是 LinkedList
,因为您将对其执行所有三个操作,1) 使用地图条目集的内容对其进行初始化,2)就地排序和 3) 迭代它,ArrayList
的工作速度大大加快。对于 Java 8.