前 N 个项目的 Map Reduce
Map Reduce for Top N Items
我在 Java 从事 Hadoop 项目,遇到了一些困难。我明白我应该做的事情的目标,但真的不明白如何实现它。我正在尝试从 map reduce 作业中提取前 N 个结果,例如前 5 个最高频率值。
我知道这通常需要两个 map reduce,一个用于 reduce,一个用于对值进行排序。然而,就像我说的,我对如何实际实现这个很迷茫。
我使用的代码是一个相当标准的 map reduce 代码,带有一些针对特殊值的过滤。
public class MapWordCount extends Mapper <LongWritable, Text, Text, IntWritable>
{
private Text wordToken = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
StringTokenizer tokens = new StringTokenizer(value.toString(), "[_|$#0123456789<>\^=\[\]\*/\\,;,.\-:()?!\"']"); //Dividing String into tokens
while (tokens.hasMoreTokens())
{
wordToken.set(tokens.nextToken());
context.write(wordToken, new IntWritable(1));
}
}
}
减速机
public class ReduceWordCount extends Reducer <Text, IntWritable, Text, IntWritable>
{
private IntWritable count = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int valueSum = 0;
for (IntWritable val : values)
{
valueSum += val.get();
}
count.set(valueSum);
context.write(key, count);
}
}
Driver
public class WordCount {
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (pathArgs.length < 2)
{
System.err.println("MR Project Usage: wordcount <input-path> [...] <output-path>");
System.exit(2);
}
Job wcJob = Job.getInstance(conf, "MapReduce WordCount");
wcJob.setJarByClass(WordCount.class);
wcJob.setMapperClass(MapWordCount.class);
wcJob.setCombinerClass(ReduceWordCount.class);
wcJob.setReducerClass(ReduceWordCount.class);
wcJob.setOutputKeyClass(Text.class);
wcJob.setOutputValueClass(IntWritable.class);
for (int i = 0; i < pathArgs.length - 1; ++i)
{
FileInputFormat.addInputPath(wcJob, new Path(pathArgs[i]));
}
FileOutputFormat.setOutputPath(wcJob, new Path(pathArgs[pathArgs.length - 1]));
System.exit(wcJob.waitForCompletion(true) ? 0 : 1);
}
}
如果有人可以帮助我,我将不胜感激。正如我所说,我知道我需要两个 map reduce,但不太确定如何开始。我尝试了在 Whosebug 上找到的其他几个解决方案,但对我的案例来说运气不佳。非常感谢!
您确实是对的,您确实需要将两个 MapReduce 作业链接在一起。更具体地说,您需要:
一个计算输入文档中存储的每个单词的单词数的作业,
还有一项工作能够“排序”所有这些单词和字数,以便挑选和输出其中最前面的 N
。
第一份工作与您已经提出的工作非常相似,所以我将重点介绍第二份工作,以便更清楚地了解 TopN 在 MapReduce 范例中的工作方式。
将 TopN MR 作业视为一个独立的作业,我们知道这个特定的作业将收到一堆键值对,其中最后一步中的每个词都将成为键,其字数将是成为价值。由于映射器和缩减器是 map
和 reduce
函数 运行 并行的孤立实例,我们需要找到一种方法来首先在本地找到 TopN 词(即对于每个映射器),并且然后将所有这些本地 TopN 结果分组,以找到输入给应用程序的所有数据的“全局”TopN 词。
因此,TopNMapper
首先必须在 setup
函数(因此在创建映射器实例之前),每个映射器将初始化它的一个对象并将每个单词及其字数作为元素。对于这种类型的计算(TopN),我们将把单词数作为键,将单词作为值,以得到一个按升序排序的单词列表。由于我们在这里只需要找出单词的前 N
个,可以肯定地说我们只需要每个映射器的前 N
个单词,因此我们可以删除下面的所有其他元素并且有 TreeMap
个 N
元素,这些元素将在映射器执行结束时提供给 reducers(即通过 cleanup
函数)。映射器将编写键值对,其中单词将成为键,它们的字数将成为值,如下所示:
<word, wordcount>
现在对于 TopNReducer
,我们需要再次使用 TreeMap
数据结构做同样的事情,用所有本地 TopN 元素填充它,删除不是的元素其中的前 N
个,并将单词及其字数作为输出。为了使该方法更“干净”,我们可以“反转”键值对结构中的单词和单词计数,这样我们就可以将单词计数作为键,将单词作为值。这导致(升序)排序的键值对数量将在完成此作业后存储在磁盘中,如下所示:
wordcount, word>
可以在 2 个 MR 作业中执行此类操作的程序如下所示(我们将 N
设置为 main
函数中的全局 Configuration
值,其中 conf.set("N", "10");
命令,并在 TopNMapper
和 TopNReducer
classes 的 setup
函数中访问它,所有 classes 都被放置为简单起见,在一个 class TopNWordCount
中:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
public class TopNWordCount
{
/* input: <document, contents>
* output: <word, 1>
*/
public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
// clean up the document text and split the words into an array
String[] words = value.toString()
.replaceAll("\d+", "") // get rid of numbers...
.replaceAll("[^a-zA-Z ]", " ") // get rid of punctuation...
.toLowerCase() // turn every letter to lowercase...
.trim() // trim the spaces
.replaceAll("\s+", " ")
.split(" ");
// write every word as key with `1` as value that indicates that the word is
// found at least 1 time inside the input text
for(String word : words)
context.write(new Text(word), one);
}
}
/* input: <word, 1>
* output: <word, wordcount>
*/
public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
private IntWritable wordcount = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int word_cnt = 0;
for(IntWritable value : values)
word_cnt += value.get();
wordcount.set(word_cnt);
context.write(key, wordcount);
}
}
/* input: <word, wordcount>
* output: <NULL, (word, wordcount)> (with the local topN words)
*/
public static class TopNMapper extends Mapper<Object, Text, Text, IntWritable>
{
private int n; // the N of TopN
private TreeMap<Integer, String> word_list; // local list with words sorted by their frequency
public void setup(Context context)
{
n = Integer.parseInt(context.getConfiguration().get("N")); // get N
word_list = new TreeMap<Integer, String>();
}
public void map(Object key, Text value, Context context)
{
String[] line = value.toString().split("\t"); // split the word and the wordcount
// put the wordcount as key and the word as value in the word list
// so the words can be sorted by their wordcounts
word_list.put(Integer.valueOf(line[1]), line[0]);
// if the local word list is populated with more than N elements
// remove the first (aka remove the word with the smallest wordcount)
if (word_list.size() > n)
word_list.remove(word_list.firstKey());
}
public void cleanup(Context context) throws IOException, InterruptedException
{
// write the topN local words before continuing to TopNReducer
// with each word as key and its wordcount as value
for (Map.Entry<Integer, String> entry : word_list.entrySet())
{
context.write(new Text(entry.getValue()), new IntWritable(entry.getKey()));
}
}
}
/* input: <word, wordcount> (with the local topN words)
* output: <wordcount, word> (with the global topN words)
*/
public static class TopNReducer extends Reducer<Text, IntWritable, IntWritable, Text>
{
private int n; // the N of TopN
private TreeMap<Integer, String> word_list; // list with words globally sorted by their frequency
public void setup(Context context)
{
n = Integer.parseInt(context.getConfiguration().get("N")); // get N
word_list = new TreeMap<Integer, String>();
}
public void reduce(Text key, Iterable<IntWritable> values, Context context)
{
int wordcount = 0;
// get the one and only value (aka the wordcount) for each word
for(IntWritable value : values)
wordcount = value.get();
// put the wordcount as key and the word as value in the word list
// so the words can be sorted by their wordcounts
word_list.put(wordcount, key.toString());
// if the global word list is populated with more than N elements
// remove the first (aka remove the word with the smallest wordcount)
if (word_list.size() > n)
word_list.remove(word_list.firstKey());
}
public void cleanup(Context context) throws IOException, InterruptedException
{
// write the topN global words with each word as key and its wordcount as value
// so the output will be sorted by the wordcount
for (Map.Entry<Integer, String> entry : word_list.entrySet())
{
context.write(new IntWritable(entry.getKey()), new Text(entry.getValue()));
}
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
conf.set("N", "10"); // set the N as a "public" value in the current Configuration
if (pathArgs.length < 2)
{
System.err.println("MR Project Usage: TopNWordCount <input-path> [...] <output-path>");
System.exit(2);
}
Path wordcount_dir = new Path("wordcount");
Path output_dir = new Path(pathArgs[pathArgs.length - 1]);
// if the in-between and output directories exists, delete them
FileSystem fs = FileSystem.get(conf);
if(fs.exists(wordcount_dir))
fs.delete(wordcount_dir, true);
if(fs.exists(output_dir))
fs.delete(output_dir, true);
Job wc_job = Job.getInstance(conf, "WordCount");
wc_job.setJarByClass(TopNWordCount.class);
wc_job.setMapperClass(WordCountMapper.class);
wc_job.setReducerClass(WordCountReducer.class);
wc_job.setMapOutputKeyClass(Text.class);
wc_job.setMapOutputValueClass(IntWritable.class);
wc_job.setOutputKeyClass(Text.class);
wc_job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < pathArgs.length - 1; ++i)
{
FileInputFormat.addInputPath(wc_job, new Path(pathArgs[i]));
}
FileOutputFormat.setOutputPath(wc_job, wordcount_dir);
wc_job.waitForCompletion(true);
Job topn_job = Job.getInstance(conf, "TopN");
topn_job.setJarByClass(TopNWordCount.class);
topn_job.setMapperClass(TopNMapper.class);
topn_job.setReducerClass(TopNReducer.class);
topn_job.setMapOutputKeyClass(Text.class);
topn_job.setMapOutputValueClass(IntWritable.class);
topn_job.setOutputKeyClass(IntWritable.class);
topn_job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(topn_job, wordcount_dir);
FileOutputFormat.setOutputPath(topn_job, output_dir);
topn_job.waitForCompletion(true);
}
}
此程序的输出(使用 this 目录和文本文件作为输入)如下:
请注意,这里的前 10 个单词是 stopwords (like the
, to
, etc.), as we should expect. If you want to filter out those stopwords, you can of course use TF-IDF and implement it in Hadoop with a lot of ways like this 例如一个。
我在 Java 从事 Hadoop 项目,遇到了一些困难。我明白我应该做的事情的目标,但真的不明白如何实现它。我正在尝试从 map reduce 作业中提取前 N 个结果,例如前 5 个最高频率值。
我知道这通常需要两个 map reduce,一个用于 reduce,一个用于对值进行排序。然而,就像我说的,我对如何实际实现这个很迷茫。
我使用的代码是一个相当标准的 map reduce 代码,带有一些针对特殊值的过滤。
public class MapWordCount extends Mapper <LongWritable, Text, Text, IntWritable>
{
private Text wordToken = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
StringTokenizer tokens = new StringTokenizer(value.toString(), "[_|$#0123456789<>\^=\[\]\*/\\,;,.\-:()?!\"']"); //Dividing String into tokens
while (tokens.hasMoreTokens())
{
wordToken.set(tokens.nextToken());
context.write(wordToken, new IntWritable(1));
}
}
}
减速机
public class ReduceWordCount extends Reducer <Text, IntWritable, Text, IntWritable>
{
private IntWritable count = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int valueSum = 0;
for (IntWritable val : values)
{
valueSum += val.get();
}
count.set(valueSum);
context.write(key, count);
}
}
Driver
public class WordCount {
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (pathArgs.length < 2)
{
System.err.println("MR Project Usage: wordcount <input-path> [...] <output-path>");
System.exit(2);
}
Job wcJob = Job.getInstance(conf, "MapReduce WordCount");
wcJob.setJarByClass(WordCount.class);
wcJob.setMapperClass(MapWordCount.class);
wcJob.setCombinerClass(ReduceWordCount.class);
wcJob.setReducerClass(ReduceWordCount.class);
wcJob.setOutputKeyClass(Text.class);
wcJob.setOutputValueClass(IntWritable.class);
for (int i = 0; i < pathArgs.length - 1; ++i)
{
FileInputFormat.addInputPath(wcJob, new Path(pathArgs[i]));
}
FileOutputFormat.setOutputPath(wcJob, new Path(pathArgs[pathArgs.length - 1]));
System.exit(wcJob.waitForCompletion(true) ? 0 : 1);
}
}
如果有人可以帮助我,我将不胜感激。正如我所说,我知道我需要两个 map reduce,但不太确定如何开始。我尝试了在 Whosebug 上找到的其他几个解决方案,但对我的案例来说运气不佳。非常感谢!
您确实是对的,您确实需要将两个 MapReduce 作业链接在一起。更具体地说,您需要:
一个计算输入文档中存储的每个单词的单词数的作业,
还有一项工作能够“排序”所有这些单词和字数,以便挑选和输出其中最前面的
N
。
第一份工作与您已经提出的工作非常相似,所以我将重点介绍第二份工作,以便更清楚地了解 TopN 在 MapReduce 范例中的工作方式。
将 TopN MR 作业视为一个独立的作业,我们知道这个特定的作业将收到一堆键值对,其中最后一步中的每个词都将成为键,其字数将是成为价值。由于映射器和缩减器是 map
和 reduce
函数 运行 并行的孤立实例,我们需要找到一种方法来首先在本地找到 TopN 词(即对于每个映射器),并且然后将所有这些本地 TopN 结果分组,以找到输入给应用程序的所有数据的“全局”TopN 词。
因此,TopNMapper
首先必须在 setup
函数(因此在创建映射器实例之前),每个映射器将初始化它的一个对象并将每个单词及其字数作为元素。对于这种类型的计算(TopN),我们将把单词数作为键,将单词作为值,以得到一个按升序排序的单词列表。由于我们在这里只需要找出单词的前 N
个,可以肯定地说我们只需要每个映射器的前 N
个单词,因此我们可以删除下面的所有其他元素并且有 TreeMap
个 N
元素,这些元素将在映射器执行结束时提供给 reducers(即通过 cleanup
函数)。映射器将编写键值对,其中单词将成为键,它们的字数将成为值,如下所示:
<word, wordcount>
现在对于 TopNReducer
,我们需要再次使用 TreeMap
数据结构做同样的事情,用所有本地 TopN 元素填充它,删除不是的元素其中的前 N
个,并将单词及其字数作为输出。为了使该方法更“干净”,我们可以“反转”键值对结构中的单词和单词计数,这样我们就可以将单词计数作为键,将单词作为值。这导致(升序)排序的键值对数量将在完成此作业后存储在磁盘中,如下所示:
wordcount, word>
可以在 2 个 MR 作业中执行此类操作的程序如下所示(我们将 N
设置为 main
函数中的全局 Configuration
值,其中 conf.set("N", "10");
命令,并在 TopNMapper
和 TopNReducer
classes 的 setup
函数中访问它,所有 classes 都被放置为简单起见,在一个 class TopNWordCount
中:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
public class TopNWordCount
{
/* input: <document, contents>
* output: <word, 1>
*/
public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
// clean up the document text and split the words into an array
String[] words = value.toString()
.replaceAll("\d+", "") // get rid of numbers...
.replaceAll("[^a-zA-Z ]", " ") // get rid of punctuation...
.toLowerCase() // turn every letter to lowercase...
.trim() // trim the spaces
.replaceAll("\s+", " ")
.split(" ");
// write every word as key with `1` as value that indicates that the word is
// found at least 1 time inside the input text
for(String word : words)
context.write(new Text(word), one);
}
}
/* input: <word, 1>
* output: <word, wordcount>
*/
public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
private IntWritable wordcount = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int word_cnt = 0;
for(IntWritable value : values)
word_cnt += value.get();
wordcount.set(word_cnt);
context.write(key, wordcount);
}
}
/* input: <word, wordcount>
* output: <NULL, (word, wordcount)> (with the local topN words)
*/
public static class TopNMapper extends Mapper<Object, Text, Text, IntWritable>
{
private int n; // the N of TopN
private TreeMap<Integer, String> word_list; // local list with words sorted by their frequency
public void setup(Context context)
{
n = Integer.parseInt(context.getConfiguration().get("N")); // get N
word_list = new TreeMap<Integer, String>();
}
public void map(Object key, Text value, Context context)
{
String[] line = value.toString().split("\t"); // split the word and the wordcount
// put the wordcount as key and the word as value in the word list
// so the words can be sorted by their wordcounts
word_list.put(Integer.valueOf(line[1]), line[0]);
// if the local word list is populated with more than N elements
// remove the first (aka remove the word with the smallest wordcount)
if (word_list.size() > n)
word_list.remove(word_list.firstKey());
}
public void cleanup(Context context) throws IOException, InterruptedException
{
// write the topN local words before continuing to TopNReducer
// with each word as key and its wordcount as value
for (Map.Entry<Integer, String> entry : word_list.entrySet())
{
context.write(new Text(entry.getValue()), new IntWritable(entry.getKey()));
}
}
}
/* input: <word, wordcount> (with the local topN words)
* output: <wordcount, word> (with the global topN words)
*/
public static class TopNReducer extends Reducer<Text, IntWritable, IntWritable, Text>
{
private int n; // the N of TopN
private TreeMap<Integer, String> word_list; // list with words globally sorted by their frequency
public void setup(Context context)
{
n = Integer.parseInt(context.getConfiguration().get("N")); // get N
word_list = new TreeMap<Integer, String>();
}
public void reduce(Text key, Iterable<IntWritable> values, Context context)
{
int wordcount = 0;
// get the one and only value (aka the wordcount) for each word
for(IntWritable value : values)
wordcount = value.get();
// put the wordcount as key and the word as value in the word list
// so the words can be sorted by their wordcounts
word_list.put(wordcount, key.toString());
// if the global word list is populated with more than N elements
// remove the first (aka remove the word with the smallest wordcount)
if (word_list.size() > n)
word_list.remove(word_list.firstKey());
}
public void cleanup(Context context) throws IOException, InterruptedException
{
// write the topN global words with each word as key and its wordcount as value
// so the output will be sorted by the wordcount
for (Map.Entry<Integer, String> entry : word_list.entrySet())
{
context.write(new IntWritable(entry.getKey()), new Text(entry.getValue()));
}
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
conf.set("N", "10"); // set the N as a "public" value in the current Configuration
if (pathArgs.length < 2)
{
System.err.println("MR Project Usage: TopNWordCount <input-path> [...] <output-path>");
System.exit(2);
}
Path wordcount_dir = new Path("wordcount");
Path output_dir = new Path(pathArgs[pathArgs.length - 1]);
// if the in-between and output directories exists, delete them
FileSystem fs = FileSystem.get(conf);
if(fs.exists(wordcount_dir))
fs.delete(wordcount_dir, true);
if(fs.exists(output_dir))
fs.delete(output_dir, true);
Job wc_job = Job.getInstance(conf, "WordCount");
wc_job.setJarByClass(TopNWordCount.class);
wc_job.setMapperClass(WordCountMapper.class);
wc_job.setReducerClass(WordCountReducer.class);
wc_job.setMapOutputKeyClass(Text.class);
wc_job.setMapOutputValueClass(IntWritable.class);
wc_job.setOutputKeyClass(Text.class);
wc_job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < pathArgs.length - 1; ++i)
{
FileInputFormat.addInputPath(wc_job, new Path(pathArgs[i]));
}
FileOutputFormat.setOutputPath(wc_job, wordcount_dir);
wc_job.waitForCompletion(true);
Job topn_job = Job.getInstance(conf, "TopN");
topn_job.setJarByClass(TopNWordCount.class);
topn_job.setMapperClass(TopNMapper.class);
topn_job.setReducerClass(TopNReducer.class);
topn_job.setMapOutputKeyClass(Text.class);
topn_job.setMapOutputValueClass(IntWritable.class);
topn_job.setOutputKeyClass(IntWritable.class);
topn_job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(topn_job, wordcount_dir);
FileOutputFormat.setOutputPath(topn_job, output_dir);
topn_job.waitForCompletion(true);
}
}
此程序的输出(使用 this 目录和文本文件作为输入)如下:
请注意,这里的前 10 个单词是 stopwords (like the
, to
, etc.), as we should expect. If you want to filter out those stopwords, you can of course use TF-IDF and implement it in Hadoop with a lot of ways like this 例如一个。