我如何拥有多个映射器和缩减器?

How can i have multiple mappers and reducers?

我有这段代码,我在其中设置了一个映射器和一个 reducer.I 想要再包含一个映射器和一个缩减器来完成进一步的工作。 问题是我必须将第一个 map reduce 作业的输出文件作为下一个 map reduce 的输入 job.Is 可以这样做吗?如果是,那我该怎么做?

public int run(String[] args) throws Exception 
          {
            JobConf conf = new JobConf(getConf(),DecisionTreec45.class);
            conf.setJobName("c4.5");

            // the keys are words (strings)
            conf.setOutputKeyClass(Text.class);
            // the values are counts (ints)
            conf.setOutputValueClass(IntWritable.class);
            conf.setMapperClass(MyMapper.class);
            conf.setReducerClass(MyReducer.class);


            //set your input file path below
            FileInputFormat.setInputPaths(conf, "/home/hduser/Id3_hds/playtennis.txt");
            FileOutputFormat.setOutputPath(conf, new Path("/home/hduser/Id3_hds/1/output"+current_index));
            JobClient.runJob(conf);
            return 0;
          }

是的,可以做到这一点。您可以查看以下教程以了解链接是如何发生的。 http://gandhigeet.blogspot.com/2012/12/as-discussed-in-previous-post-hadoop.html

确保删除 HDFS 中的中间输出数据,这些数据将由每个 MR 阶段使用 fs.delete(intermediateoutputPath);

创建

看看它是如何工作的。

你需要有两份工作。作业 2 依赖于作业 1。

public class ChainJobs extends Configured implements Tool {

 private static final String OUTPUT_PATH = "intermediate_output";

 @Override
 public int run(String[] args) throws Exception {
  /*
   * Job 1
   */
  Configuration conf = getConf();
  FileSystem fs = FileSystem.get(conf);
  Job job = new Job(conf, "Job1");
  job.setJarByClass(ChainJobs.class);

  job.setMapperClass(MyMapper1.class);
  job.setReducerClass(MyReducer1.class);

  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  TextInputFormat.addInputPath(job, new Path(args[0]));
  TextOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

  job.waitForCompletion(true); /*this goes to next command after this job is completed. your second job is dependent on your first job.*/


  /*
   * Job 2
   */
  Configuration conf2 = getConf();
  Job job2 = new Job(conf2, "Job 2");
  job2.setJarByClass(ChainJobs.class);

  job2.setMapperClass(MyMapper2.class);
  job2.setReducerClass(MyReducer2.class);

  job2.setOutputKeyClass(Text.class);
  job2.setOutputValueClass(Text.class);

  job2.setInputFormatClass(TextInputFormat.class);
  job2.setOutputFormatClass(TextOutputFormat.class);

  TextInputFormat.addInputPath(job2, new Path(OUTPUT_PATH));
  TextOutputFormat.setOutputPath(job2, new Path(args[1]));

  return job2.waitForCompletion(true) ? 0 : 1;
 }

 /**
  * Method Name: main Return type: none Purpose:Read the arguments from
  * command line and run the Job till completion
  * 
  */
 public static void main(String[] args) throws Exception {
  // TODO Auto-generated method stub
  if (args.length != 2) {
   System.err.println("Enter valid number of arguments <Inputdirectory>  <Outputlocation>");
   System.exit(0);
  }
  ToolRunner.run(new Configuration(), new ChainJobs(), args);
 }
}