映射减少单词的行频
Map Reduce Line Frequency of Words
我目前正在 Java 从事 Hadoop 项目。我的 objective 是制作一个 map reduce 来计算每个单词的行频。就像,不输出输入文件中一个单词被计数的确切次数,而只是计算它出现的行数。如果一个单词在一行中出现不止一次,它应该只被计算一次,因为我们只计算它出现的行数。我有一个基本的 map reduce 工作,我将 post,但我对如何只计算单词的行频率而不是完整的单词计数有点迷茫。任何帮助将不胜感激,非常感谢。
地图字数
public class MapWordCount extends Mapper <LongWritable, Text, Text, IntWritable>
{
private Text wordToken = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
StringTokenizer tokens = new StringTokenizer(value.toString(), "[_|$#0123456789<>\^=\[\]\*/\\,;,.\-:()?!\"']"); //Dividing String into tokens
while (tokens.hasMoreTokens())
{
wordToken.set(tokens.nextToken());
context.write(wordToken, new IntWritable(1));
}
}
}
减少字数
public class ReduceWordCount extends Reducer <Text, IntWritable, Text, IntWritable>
{
private IntWritable count = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int valueSum = 0;
for (IntWritable val : values)
{
valueSum += val.get();
}
count.set(valueSum);
context.write(key, count);
}
}
Driver代码
public class WordCount {
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (pathArgs.length < 2)
{
System.err.println("MR Project Usage: wordcount <input-path> [...] <output-path>");
System.exit(2);
}
Job wcJob = Job.getInstance(conf, "MapReduce WordCount");
wcJob.setJarByClass(WordCount.class);
wcJob.setMapperClass(MapWordCount.class);
wcJob.setCombinerClass(ReduceWordCount.class);
wcJob.setReducerClass(ReduceWordCount.class);
wcJob.setOutputKeyClass(Text.class);
wcJob.setOutputValueClass(IntWritable.class);
for (int i = 0; i < pathArgs.length - 1; ++i)
{
FileInputFormat.addInputPath(wcJob, new Path(pathArgs[i]));
}
FileOutputFormat.setOutputPath(wcJob, new Path(pathArgs[pathArgs.length - 1]));
System.exit(wcJob.waitForCompletion(true) ? 0 : 1);
}
}
在 Hadoop 的 MapReduce 用例中,事情出奇地简单,因为 Hadoop 倾向于逐行读取输入文档,即使 FileInputFormat
已明确指定 MR 的输入数据格式工作(这远远超出了您的问题范围,但您可以查看 Hadoop here and here 中的映射和文件拆分)。
由于每个映射器实例都会有一行作为其输入,您唯一需要担心的是:
1. 将文本拆分为单词(在清除标点符号、松散空格、将它们全部转为小写等之后),
2. 删除重复项以仅包含该行的唯一单词,
3. 并以 1
作为值,将每个唯一的单词作为键签名,经典的 WordCount 样式。
对于 2. 你可以使用 HashSet
,它(如你所料)是一个 Java 数据结构,它只保留独特的元素,而忽略重复项,将每个标记加载到它,然后迭代它以写入键值对并将它们发送到 reducer 实例。
这种类型的应用程序可能看起来像这样(我改变了在 Map
函数中标记文本的方式,因为它似乎没有拆分每个单词,而只是在标点符号之间拆分):
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.*;
public class LineFreq
{
public static class MapWordCount extends Mapper <LongWritable, Text, Text, IntWritable>
{
private Text wordToken = new Text();
private static final IntWritable one = new IntWritable(1);
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
// dividing String into tokens
String[] tokens = value.toString()
.replaceAll("\d+", "") // get rid of numbers...
.replaceAll("[^a-zA-Z ]", " ") // get rid of punctuation...
.toLowerCase() // turn every letter to lowercase...
.trim() // trim the spaces...
.replaceAll("\s+", " ")
.split(" ");
Set<String> word_set = new HashSet<String>(); // set to hold all of the unique words (WITHOUT DUPLICATES)
// add words to word set
for(String word : tokens)
word_set.add(word);
// write each unique word to have one occurrence in this particular line
for(String word : word_set)
{
wordToken.set(word);
context.write(wordToken, one);
}
}
}
public static class ReduceWordCount extends Reducer <Text, IntWritable, Text, IntWritable>
{
private IntWritable count = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int valueSum = 0;
for (IntWritable val : values)
valueSum += val.get();
count.set(valueSum);
context.write(key, count);
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (pathArgs.length < 2)
{
System.err.println("MR Project Usage: wordcount <input-path> [...] <output-path>");
System.exit(2);
}
Job wcJob = Job.getInstance(conf, "MapReduce WordCount");
wcJob.setJarByClass(LineFreq.class);
wcJob.setMapperClass(MapWordCount.class);
wcJob.setCombinerClass(ReduceWordCount.class);
wcJob.setReducerClass(ReduceWordCount.class);
wcJob.setOutputKeyClass(Text.class);
wcJob.setOutputValueClass(IntWritable.class);
for (int i = 0; i < pathArgs.length - 1; ++i)
{
FileInputFormat.addInputPath(wcJob, new Path(pathArgs[i]));
}
FileOutputFormat.setOutputPath(wcJob, new Path(pathArgs[pathArgs.length - 1]));
System.exit(wcJob.waitForCompletion(true) ? 0 : 1);
}
}
所以我们可以使用以下文档作为输入来测试它:
hello world! hello! how are you, world?
i am fine! world world world! hello to you too!
what a wonderful world!
amazing world i must say, indeed
并确认词频确实是计算机行的,输出如下:
我目前正在 Java 从事 Hadoop 项目。我的 objective 是制作一个 map reduce 来计算每个单词的行频。就像,不输出输入文件中一个单词被计数的确切次数,而只是计算它出现的行数。如果一个单词在一行中出现不止一次,它应该只被计算一次,因为我们只计算它出现的行数。我有一个基本的 map reduce 工作,我将 post,但我对如何只计算单词的行频率而不是完整的单词计数有点迷茫。任何帮助将不胜感激,非常感谢。
地图字数
public class MapWordCount extends Mapper <LongWritable, Text, Text, IntWritable>
{
private Text wordToken = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
StringTokenizer tokens = new StringTokenizer(value.toString(), "[_|$#0123456789<>\^=\[\]\*/\\,;,.\-:()?!\"']"); //Dividing String into tokens
while (tokens.hasMoreTokens())
{
wordToken.set(tokens.nextToken());
context.write(wordToken, new IntWritable(1));
}
}
}
减少字数
public class ReduceWordCount extends Reducer <Text, IntWritable, Text, IntWritable>
{
private IntWritable count = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int valueSum = 0;
for (IntWritable val : values)
{
valueSum += val.get();
}
count.set(valueSum);
context.write(key, count);
}
}
Driver代码
public class WordCount {
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (pathArgs.length < 2)
{
System.err.println("MR Project Usage: wordcount <input-path> [...] <output-path>");
System.exit(2);
}
Job wcJob = Job.getInstance(conf, "MapReduce WordCount");
wcJob.setJarByClass(WordCount.class);
wcJob.setMapperClass(MapWordCount.class);
wcJob.setCombinerClass(ReduceWordCount.class);
wcJob.setReducerClass(ReduceWordCount.class);
wcJob.setOutputKeyClass(Text.class);
wcJob.setOutputValueClass(IntWritable.class);
for (int i = 0; i < pathArgs.length - 1; ++i)
{
FileInputFormat.addInputPath(wcJob, new Path(pathArgs[i]));
}
FileOutputFormat.setOutputPath(wcJob, new Path(pathArgs[pathArgs.length - 1]));
System.exit(wcJob.waitForCompletion(true) ? 0 : 1);
}
}
在 Hadoop 的 MapReduce 用例中,事情出奇地简单,因为 Hadoop 倾向于逐行读取输入文档,即使 FileInputFormat
已明确指定 MR 的输入数据格式工作(这远远超出了您的问题范围,但您可以查看 Hadoop here and here 中的映射和文件拆分)。
由于每个映射器实例都会有一行作为其输入,您唯一需要担心的是:
1. 将文本拆分为单词(在清除标点符号、松散空格、将它们全部转为小写等之后),
2. 删除重复项以仅包含该行的唯一单词,
3. 并以 1
作为值,将每个唯一的单词作为键签名,经典的 WordCount 样式。
对于 2. 你可以使用 HashSet
,它(如你所料)是一个 Java 数据结构,它只保留独特的元素,而忽略重复项,将每个标记加载到它,然后迭代它以写入键值对并将它们发送到 reducer 实例。
这种类型的应用程序可能看起来像这样(我改变了在 Map
函数中标记文本的方式,因为它似乎没有拆分每个单词,而只是在标点符号之间拆分):
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.*;
public class LineFreq
{
public static class MapWordCount extends Mapper <LongWritable, Text, Text, IntWritable>
{
private Text wordToken = new Text();
private static final IntWritable one = new IntWritable(1);
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
// dividing String into tokens
String[] tokens = value.toString()
.replaceAll("\d+", "") // get rid of numbers...
.replaceAll("[^a-zA-Z ]", " ") // get rid of punctuation...
.toLowerCase() // turn every letter to lowercase...
.trim() // trim the spaces...
.replaceAll("\s+", " ")
.split(" ");
Set<String> word_set = new HashSet<String>(); // set to hold all of the unique words (WITHOUT DUPLICATES)
// add words to word set
for(String word : tokens)
word_set.add(word);
// write each unique word to have one occurrence in this particular line
for(String word : word_set)
{
wordToken.set(word);
context.write(wordToken, one);
}
}
}
public static class ReduceWordCount extends Reducer <Text, IntWritable, Text, IntWritable>
{
private IntWritable count = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int valueSum = 0;
for (IntWritable val : values)
valueSum += val.get();
count.set(valueSum);
context.write(key, count);
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (pathArgs.length < 2)
{
System.err.println("MR Project Usage: wordcount <input-path> [...] <output-path>");
System.exit(2);
}
Job wcJob = Job.getInstance(conf, "MapReduce WordCount");
wcJob.setJarByClass(LineFreq.class);
wcJob.setMapperClass(MapWordCount.class);
wcJob.setCombinerClass(ReduceWordCount.class);
wcJob.setReducerClass(ReduceWordCount.class);
wcJob.setOutputKeyClass(Text.class);
wcJob.setOutputValueClass(IntWritable.class);
for (int i = 0; i < pathArgs.length - 1; ++i)
{
FileInputFormat.addInputPath(wcJob, new Path(pathArgs[i]));
}
FileOutputFormat.setOutputPath(wcJob, new Path(pathArgs[pathArgs.length - 1]));
System.exit(wcJob.waitForCompletion(true) ? 0 : 1);
}
}
所以我们可以使用以下文档作为输入来测试它:
hello world! hello! how are you, world?
i am fine! world world world! hello to you too!
what a wonderful world!
amazing world i must say, indeed
并确认词频确实是计算机行的,输出如下: