Cloudera 中的 WordCount 作业成功但 reducer 的输出与 mapper 的输出相同
WordCount job in Cloudera is successful but output of reducer is the same as output of mapper
这个程序是用Cloudera编写的。这是我创建的驱动程序class。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCount2
{
public static void main(String[] args) throws Exception
{
if(args.length < 2)
{
System.out.println("Enter input and output path correctly ");
System.exit(-1);//exit if error occurs
}
Configuration conf = new Configuration();
@SuppressWarnings("deprecation")
Job job = new Job(conf,"WordCount2");
//Define MapReduce job
//
//job.setJobName("WordCount2");// job name created
job.setJarByClass(WordCount2.class); //Jar file will be created
//Set input/ouptput paths
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//Set input/output Format
job.setInputFormatClass(TextInputFormat.class);// input format is of TextInput Type
job.setOutputFormatClass(TextOutputFormat.class); // output format is of TextOutputType
//set Mapper and Reducer class
job.setMapperClass(WordMapper.class);
job.setReducerClass(WordReducer.class);
//Set output key-value types
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//submit job
System.exit(job.waitForCompletion(true)?0:1);// If job is completed exit successfully, else throw error
}
}
下面是 Mapper 的代码 class。
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable>
{
@Override
public void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException
{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while(tokenizer.hasMoreTokens())
{
String word= tokenizer.nextToken();
context.write(new Text(word), new IntWritable(1));
}
}
}
//------------减速器Class------------
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordReducer extends Reducer <Text,IntWritable,Text,IntWritable>
{
public void reduce(Text key,Iterator<IntWritable> values,Context context)
throws IOException, InterruptedException
{
int sum = 0;
while(values.hasNext())
{
sum += values.next().get();
}
context.write(key, new IntWritable(sum));
}
}
以下是命令行日志
[cloudera@quickstart workspace]$ hadoop jar wordcount2.jar WordCount2 /user/training/soni.txt /user/training/sonioutput2
18/04/23 07:17:23 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/04/23 07:17:24 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/04/23 07:17:25 INFO input.FileInputFormat: Total input paths to process : 1
18/04/23 07:17:25 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:952)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:690)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:879)
18/04/23 07:17:26 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:952)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:690)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:879)
18/04/23 07:17:26 INFO mapreduce.JobSubmitter: number of splits:1
18/04/23 07:17:26 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1523897572171_0005
18/04/23 07:17:27 INFO impl.YarnClientImpl: Submitted application application_1523897572171_0005
18/04/23 07:17:27 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1523897572171_0005/
18/04/23 07:17:27 INFO mapreduce.Job: Running job: job_1523897572171_0005
18/04/23 07:17:45 INFO mapreduce.Job: Job job_1523897572171_0005 running in uber mode : false
18/04/23 07:17:45 INFO mapreduce.Job: map 0% reduce 0%
18/04/23 07:18:01 INFO mapreduce.Job: map 100% reduce 0%
18/04/23 07:18:16 INFO mapreduce.Job: map 100% reduce 100%
18/04/23 07:18:17 INFO mapreduce.Job: Job job_1523897572171_0005 completed successfully
18/04/23 07:18:17 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=310
FILE: Number of bytes written=251053
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=250
HDFS: Number of bytes written=188
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=14346
Total time spent by all reduces in occupied slots (ms)=12546
Total time spent by all map tasks (ms)=14346
Total time spent by all reduce tasks (ms)=12546
Total vcore-milliseconds taken by all map tasks=14346
Total vcore-milliseconds taken by all reduce tasks=12546
Total megabyte-milliseconds taken by all map tasks=14690304
Total megabyte-milliseconds taken by all reduce tasks=12847104
Map-Reduce Framework
Map input records=7
Map output records=29
Map output bytes=246
Map output materialized bytes=310
Input split bytes=119
Combine input records=0
Combine output records=0
Reduce input groups=19
Reduce shuffle bytes=310
Reduce input records=29
Reduce output records=29
Spilled Records=58
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=1095
CPU time spent (ms)=4680
Physical memory (bytes) snapshot=407855104
Virtual memory (bytes) snapshot=3016044544
Total committed heap usage (bytes)=354553856
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=131
File Output Format Counters
Bytes Written=188
[cloudera@quickstart workspace]$
下面是输入数据当前输入文件soni.txt:
Hi How are you
I am fine
What about you
What are you doing these days
How is your job going
How is your family
My family is great
part-r-00000 文件中收到以下输出:
family 1
family 1
fine 1
going 1
great 1
is 1
is 1
is 1
job 1
these 1
you 1
you 1
you 1
your 1
your 1
但是,我认为这不应该是正确的输出。它应该给出准确的字数。
您的 reduce
方法签名错误,因此它永远不会被调用。您需要从 Reducer
class:
覆盖这个
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException;
它是一个 Iterable 而不是 Iterator
试试这个:
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
这个程序是用Cloudera编写的。这是我创建的驱动程序class。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCount2
{
public static void main(String[] args) throws Exception
{
if(args.length < 2)
{
System.out.println("Enter input and output path correctly ");
System.exit(-1);//exit if error occurs
}
Configuration conf = new Configuration();
@SuppressWarnings("deprecation")
Job job = new Job(conf,"WordCount2");
//Define MapReduce job
//
//job.setJobName("WordCount2");// job name created
job.setJarByClass(WordCount2.class); //Jar file will be created
//Set input/ouptput paths
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//Set input/output Format
job.setInputFormatClass(TextInputFormat.class);// input format is of TextInput Type
job.setOutputFormatClass(TextOutputFormat.class); // output format is of TextOutputType
//set Mapper and Reducer class
job.setMapperClass(WordMapper.class);
job.setReducerClass(WordReducer.class);
//Set output key-value types
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//submit job
System.exit(job.waitForCompletion(true)?0:1);// If job is completed exit successfully, else throw error
}
}
下面是 Mapper 的代码 class。
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable>
{
@Override
public void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException
{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while(tokenizer.hasMoreTokens())
{
String word= tokenizer.nextToken();
context.write(new Text(word), new IntWritable(1));
}
}
}
//------------减速器Class------------
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordReducer extends Reducer <Text,IntWritable,Text,IntWritable>
{
public void reduce(Text key,Iterator<IntWritable> values,Context context)
throws IOException, InterruptedException
{
int sum = 0;
while(values.hasNext())
{
sum += values.next().get();
}
context.write(key, new IntWritable(sum));
}
}
以下是命令行日志
[cloudera@quickstart workspace]$ hadoop jar wordcount2.jar WordCount2 /user/training/soni.txt /user/training/sonioutput2
18/04/23 07:17:23 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/04/23 07:17:24 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/04/23 07:17:25 INFO input.FileInputFormat: Total input paths to process : 1
18/04/23 07:17:25 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:952)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:690)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:879)
18/04/23 07:17:26 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:952)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:690)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:879)
18/04/23 07:17:26 INFO mapreduce.JobSubmitter: number of splits:1
18/04/23 07:17:26 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1523897572171_0005
18/04/23 07:17:27 INFO impl.YarnClientImpl: Submitted application application_1523897572171_0005
18/04/23 07:17:27 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1523897572171_0005/
18/04/23 07:17:27 INFO mapreduce.Job: Running job: job_1523897572171_0005
18/04/23 07:17:45 INFO mapreduce.Job: Job job_1523897572171_0005 running in uber mode : false
18/04/23 07:17:45 INFO mapreduce.Job: map 0% reduce 0%
18/04/23 07:18:01 INFO mapreduce.Job: map 100% reduce 0%
18/04/23 07:18:16 INFO mapreduce.Job: map 100% reduce 100%
18/04/23 07:18:17 INFO mapreduce.Job: Job job_1523897572171_0005 completed successfully
18/04/23 07:18:17 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=310
FILE: Number of bytes written=251053
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=250
HDFS: Number of bytes written=188
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=14346
Total time spent by all reduces in occupied slots (ms)=12546
Total time spent by all map tasks (ms)=14346
Total time spent by all reduce tasks (ms)=12546
Total vcore-milliseconds taken by all map tasks=14346
Total vcore-milliseconds taken by all reduce tasks=12546
Total megabyte-milliseconds taken by all map tasks=14690304
Total megabyte-milliseconds taken by all reduce tasks=12847104
Map-Reduce Framework
Map input records=7
Map output records=29
Map output bytes=246
Map output materialized bytes=310
Input split bytes=119
Combine input records=0
Combine output records=0
Reduce input groups=19
Reduce shuffle bytes=310
Reduce input records=29
Reduce output records=29
Spilled Records=58
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=1095
CPU time spent (ms)=4680
Physical memory (bytes) snapshot=407855104
Virtual memory (bytes) snapshot=3016044544
Total committed heap usage (bytes)=354553856
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=131
File Output Format Counters
Bytes Written=188
[cloudera@quickstart workspace]$
下面是输入数据当前输入文件soni.txt:
Hi How are you
I am fine
What about you
What are you doing these days
How is your job going
How is your family
My family is great
part-r-00000 文件中收到以下输出:
family 1
family 1
fine 1
going 1
great 1
is 1
is 1
is 1
job 1
these 1
you 1
you 1
you 1
your 1
your 1
但是,我认为这不应该是正确的输出。它应该给出准确的字数。
您的 reduce
方法签名错误,因此它永远不会被调用。您需要从 Reducer
class:
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException;
它是一个 Iterable 而不是 Iterator
试试这个:
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}