Java8 中的 MapReduce 输出排序

MapReduce output ordering in Java8

我尝试使用解决方案对 Hadoop 中的减速器输出进行排序,如这个问题所述:

MapReduce sort by value in descending order

这个和 Java8 有一些冲突,所以我解决了它们:

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.util.Map;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.LinkedHashMap;
import java.util.Collections;
import java.util.List;
import java.util.Comparator;

public class HourlyTweetsReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable result = new IntWritable();
    public Map<String , Integer> map = new LinkedHashMap<String , Integer>();

    public void reduce(Text key, Iterable<IntWritable> values, Context context)
    throws IOException, InterruptedException {

        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        map.put(key.toString() , sum);

        result.set(sum);
        context.write(key, result);
    }

    public void cleanup(Context context){
        //Cleanup is called once at the end to finish off anything for reducer
        //Here we will write our final output
        Map<String , Integer>  sortedMap = new HashMap<String , Integer>();
        sortedMap = sortMap(map);

        for (Map.Entry<String,Integer> entry : sortedMap.entrySet()){
            context.write(new Text(entry.getKey()),new IntWritable(entry.getValue()));
        }
    }

    public Map<String , Integer > sortMap (Map<String,Integer> unsortMap){

        Map<String ,Integer> hashmap = new HashMap<String,Integer>();
        int count=0;
        List<Map.Entry<String,Integer>> list = new LinkedList<Map.Entry<String,Integer>>(unsortMap.entrySet());
        //Sorting the list we created from unsorted Map
        Collections.sort(list , new Comparator<Map.Entry<String,Integer>>(){
            public int compare (Map.Entry<String , Integer> o1 , Map.Entry<String , Integer> o2 ){
                //sorting in descending order
                return o2.getValue().compareTo(o1.getValue());
            }
        });

        for(Map.Entry<String, Integer> entry : list){
            // only writing top 3 in the sorted map
            // if(count>2)
            // break;
            hashmap.put(entry.getKey(),entry.getValue());
        }

        return hashmap ;
    }

}

问题是输出没有在 运行 作业之后排序:

11  1041557
14  1304166
17  1434978
2   733462
20  1288767
23  1677571
5   460629
8   497403
11  1041557
23  1677571
2   733462
14  1304166
5   460629
17  1434978
8   497403
20  1288767

如何解决?

我在这里看到两个选项:

  1. 只使用一个减速器。这就要求所有的数据都能装在一台机器的内存中。然后,单个reducer的输入会按照key(你想要的)的顺序进行排序。
  2. 使用TotalOrderPartitioner https://hadoop.apache.org/docs/r2.4.1/api/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.html

它在 MapReduce 管道中强制执行一个附加阶段,以将元素划分到排序的桶中。

这是一个示例(不是我的),说明如何使用 TotalOrderPartioner:https://gist.github.com/asimjalis/e5627dc2ff2b23dac70b

我不打算判断此代码​​是否需要额外的步骤来确保在 Hadoop 的上下文中的正确性 Map/Reduce。

但是一个明显的错误是 sortMap 的开头有一行

Map<String ,Integer> hashmap = new HashMap<String,Integer>();

这会创建一个不维护任何特定顺序的地图,因此按排序顺序填充它没有任何效果。它应该是 LinkedHashMap,如链接的问答代码中所示。

请注意,这与调用者创建的地图无关:

Map<String , Integer>  sortedMap = new HashMap<String , Integer>();
sortedMap = sortMap(map);

这里,对创建地图的引用被 sortMap 的结果覆盖,因此地图实例完全过时了。但是,由于您要做的只是在已排序的映射上迭代一次以执行一个操作,因此您根本不需要将已排序的列表复制到结果 Map,因为您已经可以执行该操作了遍历列表:

public void cleanup(Context context) {
    //Cleanup is called once at the end to finish off anything for reducer
    //Here we will write our final output

    List<Map.Entry<String,Integer>> list = new ArrayList<>(map.entrySet());

    Collections.sort(list, Map.Entry.comparingByValue(Comparator.reverseOrder()));

    for(Map.Entry<String,Integer> entry: list) {
        context.write(new Text(entry.getKey()), new IntWritable(entry.getValue()));
    }
}

这使用了 Java 8 的 Map.Entry.comparingByValue(Comparator.reverseOrder()) 内置比较器。如果需要 Java 7 兼容性,请使用问题中显示的比较器代码,

new Comparator<Map.Entry<String, Integer>>() {
    public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
        //sorting in descending order
        return o2.getValue().compareTo(o1.getValue());
    }
}

请注意,此代码使用 ArrayList 而不是 LinkedList,因为您将对其执行所有三个操作,1) 使用地图条目集的内容对其进行初始化,2)就地排序和 3) 迭代它,ArrayList 的工作速度大大加快。对于 Java 8.

中的步骤 2) 尤其如此