Cloudera 中的 WordCount 作业成功但 reducer 的输出与 mapper 的输出相同

WordCount job in Cloudera is successful but output of reducer is the same as output of mapper

这个程序是用Cloudera编写的。这是我创建的驱动程序class。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class WordCount2 
{
    public static void main(String[] args) throws Exception
    {
        if(args.length < 2)
        {
          System.out.println("Enter input and output path correctly ");
          System.exit(-1);//exit if error occurs
        }

        Configuration conf = new Configuration();

        @SuppressWarnings("deprecation")
        Job job = new Job(conf,"WordCount2");
        //Define MapReduce job

        //
        //job.setJobName("WordCount2");// job name created
        job.setJarByClass(WordCount2.class); //Jar file will be created

        //Set input/ouptput paths
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        //Set input/output Format
        job.setInputFormatClass(TextInputFormat.class);// input format is of TextInput Type
        job.setOutputFormatClass(TextOutputFormat.class); // output format is of TextOutputType

        //set Mapper and Reducer class
        job.setMapperClass(WordMapper.class);
        job.setReducerClass(WordReducer.class);

        //Set output key-value types
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //submit job
        System.exit(job.waitForCompletion(true)?0:1);// If job is completed exit successfully, else throw error

    }

}

下面是 Mapper 的代码 class。

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;


public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable>
{

    @Override
    public void map(LongWritable key, Text value,Context context)
    throws IOException, InterruptedException
    {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while(tokenizer.hasMoreTokens())
        {
            String word= tokenizer.nextToken();
            context.write(new Text(word), new IntWritable(1));
        }


    }
}

//------------减速器Class------------

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


public class WordReducer extends Reducer <Text,IntWritable,Text,IntWritable>
{
    public void reduce(Text key,Iterator<IntWritable> values,Context context)
    throws IOException, InterruptedException
    {
        int sum = 0;
        while(values.hasNext())
        {

            sum  += values.next().get();
        }
        context.write(key, new IntWritable(sum));
    }

}

以下是命令行日志

[cloudera@quickstart workspace]$ hadoop jar wordcount2.jar WordCount2 /user/training/soni.txt /user/training/sonioutput2
18/04/23 07:17:23 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/04/23 07:17:24 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/04/23 07:17:25 INFO input.FileInputFormat: Total input paths to process : 1
18/04/23 07:17:25 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
    at java.lang.Object.wait(Native Method)
    at java.lang.Thread.join(Thread.java:1281)
    at java.lang.Thread.join(Thread.java:1355)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:952)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:690)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:879)
18/04/23 07:17:26 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
    at java.lang.Object.wait(Native Method)
    at java.lang.Thread.join(Thread.java:1281)
    at java.lang.Thread.join(Thread.java:1355)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:952)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:690)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:879)
18/04/23 07:17:26 INFO mapreduce.JobSubmitter: number of splits:1
18/04/23 07:17:26 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1523897572171_0005
18/04/23 07:17:27 INFO impl.YarnClientImpl: Submitted application application_1523897572171_0005
18/04/23 07:17:27 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1523897572171_0005/
18/04/23 07:17:27 INFO mapreduce.Job: Running job: job_1523897572171_0005
18/04/23 07:17:45 INFO mapreduce.Job: Job job_1523897572171_0005 running in uber mode : false
18/04/23 07:17:45 INFO mapreduce.Job:  map 0% reduce 0%
18/04/23 07:18:01 INFO mapreduce.Job:  map 100% reduce 0%
18/04/23 07:18:16 INFO mapreduce.Job:  map 100% reduce 100%
18/04/23 07:18:17 INFO mapreduce.Job: Job job_1523897572171_0005 completed successfully
18/04/23 07:18:17 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=310
        FILE: Number of bytes written=251053
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=250
        HDFS: Number of bytes written=188
        HDFS: Number of read operations=6
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters 
        Launched map tasks=1
        Launched reduce tasks=1
        Data-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=14346
        Total time spent by all reduces in occupied slots (ms)=12546
        Total time spent by all map tasks (ms)=14346
        Total time spent by all reduce tasks (ms)=12546
        Total vcore-milliseconds taken by all map tasks=14346
        Total vcore-milliseconds taken by all reduce tasks=12546
        Total megabyte-milliseconds taken by all map tasks=14690304
        Total megabyte-milliseconds taken by all reduce tasks=12847104
    Map-Reduce Framework
        Map input records=7
        Map output records=29
        Map output bytes=246
        Map output materialized bytes=310
        Input split bytes=119
        Combine input records=0
        Combine output records=0
        Reduce input groups=19
        Reduce shuffle bytes=310
        Reduce input records=29
        Reduce output records=29
        Spilled Records=58
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=1095
        CPU time spent (ms)=4680
        Physical memory (bytes) snapshot=407855104
        Virtual memory (bytes) snapshot=3016044544
        Total committed heap usage (bytes)=354553856
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=131
    File Output Format Counters 
        Bytes Written=188
[cloudera@quickstart workspace]$

下面是输入数据当前输入文件soni.txt:

Hi How are you
I am fine
What about you
What are you doing these days
How is your job going 
How is your family
My family is great

part-r-00000 文件中收到以下输出:

family  1
family  1
fine    1
going   1
great   1
is  1
is  1
is  1
job 1
these   1
you 1
you 1
you 1
your    1
your    1

但是,我认为这不应该是正确的输出。它应该给出准确的字数。

您的 reduce 方法签名错误,因此它永远不会被调用。您需要从 Reducer class:

覆盖这个
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException;

它是一个 Iterable 而不是 Iterator

试试这个:

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
        throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable value : values) {
        sum += value.get();
    }
    context.write(key, new IntWritable(sum));
}