MapReduce倒排索引程序
MapReduce Inverted Index Program
为什么我不能将 values.next()(这是 IntWritable 对象)传递给也是 IntWriteable 的文件哈希集?(参考 REDUCER CLASS)
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class LineIndexer{
映射器CLASS
public static class LineIndexMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static Text word = new Text();
private final static Text location = new Text();
public void map(LongWritable key, Text val, OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
// get the filename where this line came from
FileSplit fileSplit = (FileSplit)reporter.getInputSplit();
IntWritable fileNo = new IntWritable(Integer.parseInt(fileSplit.getPath().getName()));
String line = val.toString();
StringTokenizer itr = new StringTokenizer(line.toLowerCase());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, fileNo);
}
}
}
减速机CLASS
public static class LineIndexReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, Text> {
private final static HashSet<IntWritable> files = new HashSet<IntWritable>();
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
files.clear();
int count=0;
StringBuilder toReturn = new StringBuilder();
StringBuilder keyfreq = new StringBuilder();
System.out.println("values"+values);
while (values.hasNext()){
//String filename = values.next().toString();
System.out.println("value.next"+values.next());
if( !(files.contains(values.next()))){
files.add(values.next());
if (count!=0)
toReturn.append("-> ");
count++;
toReturn.append(values.next());
}
}
IntWritable freq = new IntWritable(count);
keyfreq.append(key.toString());
keyfreq.append("|");
keyfreq.append(freq);
output.collect(new Text(keyfreq.toString()), new Text(toReturn.toString()));
}
}
运行 方法
public static void run(String input, String output){
JobClient client = new JobClient();
JobConf conf = new JobConf(LineIndexer.class);
conf.setJobName("InvertedIndex");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(conf, new Path(input));
FileOutputFormat.setOutputPath(conf, new Path(output));
conf.setMapperClass(LineIndexMapper.class);
conf.setReducerClass(LineIndexReducer.class);
client.setConf(conf);
try {
JobClient.runJob(conf);
} catch (Exception e) {
e.printStackTrace();
}
}
主要方法
public static void main(String[] args) {
if( args.length != 2 ){
System.err.println("InvertedIndex <input_dir> <output_dir>");
}else{
run(args[0], args[1]);
}
}
}
错误:
For lines 49 and 50 of LineIndexReducer class
line 49 :if( !(files.contains(values.next()))){
line 50: files.add(values.next());
java.util.NoSuchElementException:在 org.apache.hadoop.mapred.Task$ValuesIterator.next(Task.java:121 在 org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.moveToNext 迭代过去的最后一个值
(减少Task.java:250)
在 org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.next(ReduceTask.java:246)
在 LineIndexer$LineIndexReducer.reduce(LineIndexer.java:49)
在 LineIndexer$LineIndexReducer.reduce(LineIndexer.java:1)
在 org.apache.hadoop.mapred.ReduceTask.运行OldReducer(ReduceTask.java:522)
在 org.apache.hadoop.mapred.ReduceTask.运行(减少Task.java:421)
在 org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:398)
您应该只调用 next()
一次:
while (values.hasNext()){
IntWritable filename = values.next();
System.out.println("value.next" + filename );
if( !(files.contains(filename))){
files.add(filename);
if (count!=0)
toReturn.append("-> ");
count++;
toReturn.append(filename);
}
}
我发现你的 map reduce 代码有问题,这不是不正确,而是对系统的严重惩罚。它的地图功能:
public void map(LongWritable key, Text val, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
FileSplit fileSplit = (FileSplit)reporter.getInputSplit();
IntWritable fileNo = new IntWritable(Integer.parseInt(fileSplit.getPath().getName()));
}
您将为每条记录执行此操作,但不会针对每个 InputSplit 更改。移动到 setup() 函数
为什么我不能将 values.next()(这是 IntWritable 对象)传递给也是 IntWriteable 的文件哈希集?(参考 REDUCER CLASS)
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class LineIndexer{
映射器CLASS
public static class LineIndexMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static Text word = new Text();
private final static Text location = new Text();
public void map(LongWritable key, Text val, OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
// get the filename where this line came from
FileSplit fileSplit = (FileSplit)reporter.getInputSplit();
IntWritable fileNo = new IntWritable(Integer.parseInt(fileSplit.getPath().getName()));
String line = val.toString();
StringTokenizer itr = new StringTokenizer(line.toLowerCase());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, fileNo);
}
}
}
减速机CLASS
public static class LineIndexReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, Text> {
private final static HashSet<IntWritable> files = new HashSet<IntWritable>();
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
files.clear();
int count=0;
StringBuilder toReturn = new StringBuilder();
StringBuilder keyfreq = new StringBuilder();
System.out.println("values"+values);
while (values.hasNext()){
//String filename = values.next().toString();
System.out.println("value.next"+values.next());
if( !(files.contains(values.next()))){
files.add(values.next());
if (count!=0)
toReturn.append("-> ");
count++;
toReturn.append(values.next());
}
}
IntWritable freq = new IntWritable(count);
keyfreq.append(key.toString());
keyfreq.append("|");
keyfreq.append(freq);
output.collect(new Text(keyfreq.toString()), new Text(toReturn.toString()));
}
}
运行 方法
public static void run(String input, String output){
JobClient client = new JobClient();
JobConf conf = new JobConf(LineIndexer.class);
conf.setJobName("InvertedIndex");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(conf, new Path(input));
FileOutputFormat.setOutputPath(conf, new Path(output));
conf.setMapperClass(LineIndexMapper.class);
conf.setReducerClass(LineIndexReducer.class);
client.setConf(conf);
try {
JobClient.runJob(conf);
} catch (Exception e) {
e.printStackTrace();
}
}
主要方法
public static void main(String[] args) {
if( args.length != 2 ){
System.err.println("InvertedIndex <input_dir> <output_dir>");
}else{
run(args[0], args[1]);
}
}
}
错误:
For lines 49 and 50 of LineIndexReducer class
line 49 :if( !(files.contains(values.next()))){
line 50: files.add(values.next());
java.util.NoSuchElementException:在 org.apache.hadoop.mapred.Task$ValuesIterator.next(Task.java:121 在 org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.moveToNext 迭代过去的最后一个值 (减少Task.java:250) 在 org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.next(ReduceTask.java:246) 在 LineIndexer$LineIndexReducer.reduce(LineIndexer.java:49) 在 LineIndexer$LineIndexReducer.reduce(LineIndexer.java:1) 在 org.apache.hadoop.mapred.ReduceTask.运行OldReducer(ReduceTask.java:522) 在 org.apache.hadoop.mapred.ReduceTask.运行(减少Task.java:421) 在 org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:398)
您应该只调用 next()
一次:
while (values.hasNext()){
IntWritable filename = values.next();
System.out.println("value.next" + filename );
if( !(files.contains(filename))){
files.add(filename);
if (count!=0)
toReturn.append("-> ");
count++;
toReturn.append(filename);
}
}
我发现你的 map reduce 代码有问题,这不是不正确,而是对系统的严重惩罚。它的地图功能:
public void map(LongWritable key, Text val, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
FileSplit fileSplit = (FileSplit)reporter.getInputSplit();
IntWritable fileNo = new IntWritable(Integer.parseInt(fileSplit.getPath().getName()));
}
您将为每条记录执行此操作,但不会针对每个 InputSplit 更改。移动到 setup() 函数