映射减少单词的行频

Map Reduce Line Frequency of Words

我目前正在 Java 从事 Hadoop 项目。我的 objective 是制作一个 map reduce 来计算每个单词的行频。就像,不输出输入文件中一个单词被计数的确切次数,而只是计算它出现的行数。如果一个单词在一行中出现不止一次,它应该只被计算一次,因为我们只计算它出现的行数。我有一个基本的 map reduce 工作,我将 post,但我对如何只计算单词的行频率而不是完整的单词计数有点迷茫。任何帮助将不胜感激,非常感谢。

地图字数

public class MapWordCount extends Mapper <LongWritable, Text, Text, IntWritable>
{
      private Text wordToken = new Text();
      public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
      {
          StringTokenizer tokens = new StringTokenizer(value.toString(), "[_|$#0123456789<>\^=\[\]\*/\\,;,.\-:()?!\"']"); //Dividing String into tokens
        while (tokens.hasMoreTokens())
        {
          wordToken.set(tokens.nextToken());
          context.write(wordToken, new IntWritable(1));
        }
      }
    }

减少字数

public class ReduceWordCount extends Reducer <Text, IntWritable, Text, IntWritable>
{
      private IntWritable count = new IntWritable();
      public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
      {
        int valueSum = 0;
        for (IntWritable val : values)
        {
          valueSum += val.get();
        }
        count.set(valueSum);
        context.write(key, count);
      }
    }

Driver代码

public class WordCount {
      public static void main(String[] args) throws Exception
      {
        Configuration conf = new Configuration();
        String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (pathArgs.length < 2)
        {
          System.err.println("MR Project Usage: wordcount <input-path> [...] <output-path>");
          System.exit(2);
        }
        Job wcJob = Job.getInstance(conf, "MapReduce WordCount");
        wcJob.setJarByClass(WordCount.class);
        wcJob.setMapperClass(MapWordCount.class);
        wcJob.setCombinerClass(ReduceWordCount.class);
        wcJob.setReducerClass(ReduceWordCount.class);
        wcJob.setOutputKeyClass(Text.class);
        wcJob.setOutputValueClass(IntWritable.class);
        for (int i = 0; i < pathArgs.length - 1; ++i)
        {
          FileInputFormat.addInputPath(wcJob, new Path(pathArgs[i]));
        }
        FileOutputFormat.setOutputPath(wcJob, new Path(pathArgs[pathArgs.length - 1]));
        System.exit(wcJob.waitForCompletion(true) ? 0 : 1);
      }
    }

在 Hadoop 的 MapReduce 用例中,事情出奇地简单,因为 Hadoop 倾向于逐行读取输入文档,即使 FileInputFormat 已明确指定 MR 的输入数据格式工作(这远远超出了您的问题范围,但您可以查看 Hadoop here and here 中的映射和文件拆分)。

由于每个映射器实例都会有一行作为其输入,您唯一需要担心的是:

1. 将文本拆分为单词(在清除标点符号、松散空格、将它们全部转为小写等之后),

2. 删除重复项以仅包含该行的唯一单词,

3. 并以 1 作为值,将每个唯一的单词作为键签名,经典的 WordCount 样式。

对于 2. 你可以使用 HashSet ,它(如你所料)是一个 Java 数据结构,它只保留独特的元素,而忽略重复项,将每个标记加载到它,然后迭代它以写入键值对并将它们发送到 reducer 实例。

这种类型的应用程序可能看起来像这样(我改变了在 Map 函数中标记文本的方式,因为它似乎没有拆分每个单词,而只是在标点符号之间拆分):

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.*;

public class LineFreq
{
    public static class MapWordCount extends Mapper <LongWritable, Text, Text, IntWritable>
    {
        private Text wordToken = new Text();
        private static final IntWritable one = new IntWritable(1);

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
        {
            // dividing String into tokens
            String[] tokens = value.toString()
                                .replaceAll("\d+", "")           // get rid of numbers...
                                .replaceAll("[^a-zA-Z ]", " ")    // get rid of punctuation...
                                .toLowerCase()                                      // turn every letter to lowercase...
                                .trim()                                             // trim the spaces...
                                .replaceAll("\s+", " ")
                                .split(" ");

            Set<String> word_set = new HashSet<String>();   // set to hold all of the unique words (WITHOUT DUPLICATES)

            // add words to word set
            for(String word : tokens)
                word_set.add(word);

            // write each unique word to have one occurrence in this particular line
            for(String word : word_set)
            {
                wordToken.set(word);
                context.write(wordToken, one);
            }

        }
    }

    public static class ReduceWordCount extends Reducer <Text, IntWritable, Text, IntWritable>
    {
        private IntWritable count = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
        {
            int valueSum = 0;

            for (IntWritable val : values)
              valueSum += val.get();

            count.set(valueSum);
            context.write(key, count);
        }
    }

    public static void main(String[] args) throws Exception
    {
        Configuration conf = new Configuration();
        String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        if (pathArgs.length < 2)
        {
            System.err.println("MR Project Usage: wordcount <input-path> [...] <output-path>");
            System.exit(2);
        }

        Job wcJob = Job.getInstance(conf, "MapReduce WordCount");
        wcJob.setJarByClass(LineFreq.class);
        wcJob.setMapperClass(MapWordCount.class);
        wcJob.setCombinerClass(ReduceWordCount.class);
        wcJob.setReducerClass(ReduceWordCount.class);
        wcJob.setOutputKeyClass(Text.class);
        wcJob.setOutputValueClass(IntWritable.class);
        for (int i = 0; i < pathArgs.length - 1; ++i)
        {
            FileInputFormat.addInputPath(wcJob, new Path(pathArgs[i]));
        }
        FileOutputFormat.setOutputPath(wcJob, new Path(pathArgs[pathArgs.length - 1]));
        System.exit(wcJob.waitForCompletion(true) ? 0 : 1);
    }
}

所以我们可以使用以下文档作为输入来测试它:

hello world! hello! how are you, world?
i am fine! world world world! hello to you too!
what a wonderful world!
amazing world i must say, indeed

并确认词频确实是计算机行的,输出如下: