前 N 个项目的 Map Reduce

Map Reduce for Top N Items

我在 Java 从事 Hadoop 项目,遇到了一些困难。我明白我应该做的事情的目标,但真的不明白如何实现它。我正在尝试从 map reduce 作业中提取前 N 个结果,例如前 5 个最高频率值。

我知道这通常需要两个 map reduce,一个用于 reduce,一个用于对值进行排序。然而,就像我说的,我对如何实际实现这个很迷茫。

我使用的代码是一个相当标准的 map reduce 代码,带有一些针对特殊值的过滤。

public class MapWordCount extends Mapper <LongWritable, Text, Text, IntWritable>
{
      private Text wordToken = new Text();
      public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
      {
          StringTokenizer tokens = new StringTokenizer(value.toString(), "[_|$#0123456789<>\^=\[\]\*/\\,;,.\-:()?!\"']"); //Dividing String into tokens
        while (tokens.hasMoreTokens())
        {
          wordToken.set(tokens.nextToken());
          context.write(wordToken, new IntWritable(1));
        }
      }
    }

减速机

public class ReduceWordCount extends Reducer <Text, IntWritable, Text, IntWritable>
{
      private IntWritable count = new IntWritable();
      public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
      {
        int valueSum = 0;
        for (IntWritable val : values)
        {
          valueSum += val.get();
        }
        count.set(valueSum);
        context.write(key, count);
      }
    }

Driver

public class WordCount {
      public static void main(String[] args) throws Exception
      {
        Configuration conf = new Configuration();
        String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (pathArgs.length < 2)
        {
          System.err.println("MR Project Usage: wordcount <input-path> [...] <output-path>");
          System.exit(2);
        }
        Job wcJob = Job.getInstance(conf, "MapReduce WordCount");
        wcJob.setJarByClass(WordCount.class);
        wcJob.setMapperClass(MapWordCount.class);
        wcJob.setCombinerClass(ReduceWordCount.class);
        wcJob.setReducerClass(ReduceWordCount.class);
        wcJob.setOutputKeyClass(Text.class);
        wcJob.setOutputValueClass(IntWritable.class);
        for (int i = 0; i < pathArgs.length - 1; ++i)
        {
          FileInputFormat.addInputPath(wcJob, new Path(pathArgs[i]));
        }
        FileOutputFormat.setOutputPath(wcJob, new Path(pathArgs[pathArgs.length - 1]));
        System.exit(wcJob.waitForCompletion(true) ? 0 : 1);
      }
    }

如果有人可以帮助我,我将不胜感激。正如我所说,我知道我需要两个 map reduce,但不太确定如何开始。我尝试了在 Whosebug 上找到的其他几个解决方案,但对我的案例来说运气不佳。非常感谢!

您确实是对的,您确实需要将两个 MapReduce 作业链接在一起。更具体地说,您需要:

  • 一个计算输入文档中存储的每个单词的单词数的作业,

  • 还有一项工作能够“排序”所有这些单词和字数,以便挑选和输出其中最前面的 N

第一份工作与您已经提出的工作非常相似,所以我将重点介绍第二份工作,以便更清楚地了解 TopN 在 MapReduce 范例中的工作方式。

将 TopN MR 作业视为一个独立的作业,我们知道这个特定的作业将收到一堆键值对,其中最后一步中的每个词都将成为键,其字数将是成为价值。由于映射器和缩减器是 mapreduce 函数 运行 并行的孤立实例,我们需要找到一种方法来首先在本地找到 TopN 词(即对于每个映射器),并且然后将所有这些本地 TopN 结果分组,以找到输入给应用程序的所有数据的“全局”TopN 词。

因此,TopNMapper 首先必须在 setup 函数(因此在创建映射器实例之前),每个映射器将初始化它的一个对象并将每个单词及其字数作为元素。对于这种类型的计算(TopN),我们将把单词数作为键,将单词作为值,以得到一个按升序排序的单词列表。由于我们在这里只需要找出单词的前 N 个,可以肯定地说我们只需要每个映射器的前 N 个单词,因此我们可以删除下面的所有其他元素并且有 TreeMapN 元素,这些元素将在映射器执行结束时提供给 reducers(即通过 cleanup 函数)。映射器将编写键值对,其中单词将成为键,它们的字数将成为值,如下所示:

<word, wordcount>

现在对于 TopNReducer,我们需要再次使用 TreeMap 数据结构做同样的事情,用所有本地 TopN 元素填充它,删除不是的元素其中的前 N 个,并将单词及其字数作为输出。为了使该方法更“干净”,我们可以“反转”键值对结构中的单词和单词计数,这样我们就可以将单词计数作为键,将单词作为值。这导致(升序)排序的键值对数量将在完成此作业后存储在磁盘中,如下所示:

wordcount, word>

可以在 2 个 MR 作业中执行此类操作的程序如下所示(我们将 N 设置为 main 函数中的全局 Configuration 值,其中 conf.set("N", "10"); 命令,并在 TopNMapperTopNReducer classes 的 setup 函数中访问它,所有 classes 都被放置为简单起见,在一个 class TopNWordCount 中:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;


public class TopNWordCount
{
    /* input:  <document, contents>
     * output: <word, 1>
     */
    public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable>
    {
        private final static IntWritable one = new IntWritable(1);

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException
        {
            // clean up the document text and split the words into an array
            String[] words = value.toString()
                                .replaceAll("\d+", "")           // get rid of numbers...
                                .replaceAll("[^a-zA-Z ]", " ")    // get rid of punctuation...
                                .toLowerCase()                                      // turn every letter to lowercase...
                                .trim()                                             // trim the spaces
                                .replaceAll("\s+", " ")
                                .split(" ");

            // write every word as key with `1` as value that indicates that the word is
            // found at least 1 time inside the input text
            for(String word : words)
                context.write(new Text(word), one);
        }
    }
    
    /* input: <word, 1>
     * output: <word, wordcount>
     */
    public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>
    {
        private IntWritable wordcount = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
        {
            int word_cnt = 0;

            for(IntWritable value : values)
                word_cnt += value.get();

            wordcount.set(word_cnt);

            context.write(key, wordcount);
        }
    }



    /* input:  <word, wordcount>
     * output: <NULL, (word, wordcount)> (with the local topN words)
     */
    public static class TopNMapper extends Mapper<Object, Text, Text, IntWritable>
    {
        private int n;  // the N of TopN
        private TreeMap<Integer, String> word_list; // local list with words sorted by their frequency

        public void setup(Context context)
        {
            n = Integer.parseInt(context.getConfiguration().get("N"));  // get N
            word_list = new TreeMap<Integer, String>();
        }

        public void map(Object key, Text value, Context context)
        {
            String[] line = value.toString().split("\t");   // split the word and the wordcount

            // put the wordcount as key and the word as value in the word list
            // so the words can be sorted by their wordcounts
            word_list.put(Integer.valueOf(line[1]), line[0]);

            // if the local word list is populated with more than N elements
            // remove the first (aka remove the word with the smallest wordcount)
            if (word_list.size() > n)
                word_list.remove(word_list.firstKey());
        }

        public void cleanup(Context context) throws IOException, InterruptedException
        {
            // write the topN local words before continuing to TopNReducer
            // with each word as key and its wordcount as value
            for (Map.Entry<Integer, String> entry : word_list.entrySet())
            {
                context.write(new Text(entry.getValue()), new IntWritable(entry.getKey()));
            }
        }
    }

    /* input:  <word, wordcount> (with the local topN words)
     * output: <wordcount, word> (with the global topN words)
     */
    public static class TopNReducer extends Reducer<Text, IntWritable, IntWritable, Text>
    {
        private int n;  // the N of TopN
        private TreeMap<Integer, String> word_list; //  list with words globally sorted by their frequency

        public void setup(Context context)
        {
            n = Integer.parseInt(context.getConfiguration().get("N"));  // get N
            word_list = new TreeMap<Integer, String>();
        }

        public void reduce(Text key, Iterable<IntWritable> values, Context context)
        {
            int wordcount = 0;

            // get the one and only value (aka the wordcount) for each word
            for(IntWritable value : values)
                wordcount = value.get();

            // put the wordcount as key and the word as value in the word list
            // so the words can be sorted by their wordcounts
            word_list.put(wordcount, key.toString());

            // if the global word list is populated with more than N elements
            // remove the first (aka remove the word with the smallest wordcount)
            if (word_list.size() > n)
                word_list.remove(word_list.firstKey());
        }

        public void cleanup(Context context) throws IOException, InterruptedException
        {
            // write the topN global words with each word as key and its wordcount as value
            // so the output will be sorted by the wordcount
            for (Map.Entry<Integer, String> entry : word_list.entrySet())
            {
                context.write(new IntWritable(entry.getKey()), new Text(entry.getValue()));
            }
        }
    }


    public static void main(String[] args) throws Exception
    {
        Configuration conf = new Configuration();
        String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        conf.set("N", "10"); // set the N as a "public" value in the current Configuration

        if (pathArgs.length < 2)
        {
          System.err.println("MR Project Usage: TopNWordCount <input-path> [...] <output-path>");
          System.exit(2);
        }

        Path wordcount_dir = new Path("wordcount");
        Path output_dir = new Path(pathArgs[pathArgs.length - 1]);

        // if the in-between and output directories exists, delete them
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(wordcount_dir))
            fs.delete(wordcount_dir, true);
        if(fs.exists(output_dir))
            fs.delete(output_dir, true);

        Job wc_job = Job.getInstance(conf, "WordCount");
        wc_job.setJarByClass(TopNWordCount.class);
        wc_job.setMapperClass(WordCountMapper.class);
        wc_job.setReducerClass(WordCountReducer.class);
        wc_job.setMapOutputKeyClass(Text.class);
        wc_job.setMapOutputValueClass(IntWritable.class);
        wc_job.setOutputKeyClass(Text.class);
        wc_job.setOutputValueClass(IntWritable.class);
        for (int i = 0; i < pathArgs.length - 1; ++i)
        {
          FileInputFormat.addInputPath(wc_job, new Path(pathArgs[i]));
        }
        FileOutputFormat.setOutputPath(wc_job, wordcount_dir);
        wc_job.waitForCompletion(true);
        
        Job topn_job = Job.getInstance(conf, "TopN");
        topn_job.setJarByClass(TopNWordCount.class);
        topn_job.setMapperClass(TopNMapper.class);
        topn_job.setReducerClass(TopNReducer.class);
        topn_job.setMapOutputKeyClass(Text.class);
        topn_job.setMapOutputValueClass(IntWritable.class);
        topn_job.setOutputKeyClass(IntWritable.class);
        topn_job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(topn_job, wordcount_dir);
        FileOutputFormat.setOutputPath(topn_job, output_dir);
        topn_job.waitForCompletion(true);
    }
}

此程序的输出(使用 this 目录和文本文件作为输入)如下:

请注意,这里的前 10 个单词是 stopwords (like the, to, etc.), as we should expect. If you want to filter out those stopwords, you can of course use TF-IDF and implement it in Hadoop with a lot of ways like this 例如一个。