分析多个输入文件并只输出一个包含一个最终结果的文件

Analyzing multiple input files and output only one file containing one final result

我对 MapReduce 不是很了解。我需要实现的是从几个输入文件的分析中输出一行结果。目前,我的结果包含每个输入文件一行。所以如果我有 3 个输入文件,我将有一个包含 3 行的输出文件;每个输入的结果。由于我对结果进行了排序,因此我只需要将第一个结果写入 HDFS 文件。我的代码如下:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordLength {


    public static class Map extends Mapper<Object, Text, LongWritable, Text> {
       // private final static IntWritable one = new IntWritable(1);
        int max = Integer.MIN_VALUE;
         private Text word = new Text();
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString(); //cumleni goturur file dan, 1 line i
            StringTokenizer tokenizer = new StringTokenizer(line); //cumleni sozlere bolur 
            while (tokenizer.hasMoreTokens()) {
                String s= tokenizer.nextToken();
                int val = s.length();
                if(val>max) {
                    max=val;
                    word.set(s);


                }
          }

        }

        public void cleanup(Context context) throws IOException, InterruptedException {    
            context.write(new LongWritable(max), word);    
        }
    }

  public static class IntSumReducer
       extends Reducer<LongWritable,Text,Text,LongWritable> {
    private IntWritable result = new IntWritable();
    int max=-100;
    public void reduce(LongWritable key, Iterable<Text> values,
                       Context context
                       ) throws IOException, InterruptedException {



             context.write(new Text("longest"), key);


        //context.write(new Text("longest"),key);
      System.err.println(key);

    }
  }



  public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(Map.class);
        job.setSortComparatorClass(LongWritable.DecreasingComparator.class);
       //job.setCombinerClass(IntSumReducer.class);
        job.setNumReduceTasks(1);
        job.setReducerClass(IntSumReducer.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
      }

  }
}

它为每个输入找到最长的单词并将其打印出来。但是我需要在所有可能的输入文件中找到最长的长度,并且只打印一行。

所以输出是:

最长的 11

最长的 10

最长的 8

我希望它只包含:

最长的 11

谢谢

更改了查找最长字长的代码。现在它只打印最长的 11。如果您有更好的方法,请随时纠正我的解决方案,因为我渴望学习最佳选择

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {


    public static class Map extends Mapper<Object, Text, Text, LongWritable> {
       // private final static IntWritable one = new IntWritable(1);
        int max = Integer.MIN_VALUE;
         private Text word = new Text();
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString(); //cumleni goturur file dan, 1 line i
            StringTokenizer tokenizer = new StringTokenizer(line); //cumleni sozlere bolur 
            while (tokenizer.hasMoreTokens()) {
                String s= tokenizer.nextToken();
                int val = s.length();
                    if(val>max) {
                        max=val;
                        word.set(s);

                    context.write(word,new LongWritable(val)); 

          }

        }
        }

    }


  public static class IntSumReducer
       extends Reducer<Text,LongWritable,Text,LongWritable> {
    private LongWritable result = new LongWritable();
    long max=-100;
    public void reduce(Text key, Iterable<LongWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {

     // int sum = -1;

        for (LongWritable val : values) {
           if(val.get()>max) {
               max=val.get();

           }
          }
        result.set(max);

    }

    public void cleanup(Context context) throws IOException, InterruptedException {    
        context.write(new Text("longest"),result );   
    }
  }



  public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(Map.class);
        job.setSortComparatorClass(LongWritable.DecreasingComparator.class);
      // job.setCombinerClass(IntSumReducer.class);
        job.setNumReduceTasks(1);
        job.setReducerClass(IntSumReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
      }


}