执行 MapReduce 时使用 InverseMapper 和 IdentityReducer 时出错
Error Using InverseMapper and IdentityReducer while Executing MapReduce
所以我有一个巨大的访问日志文件,我正试图在服务器上找到命中率最高的路径。查找一条路径被命中的次数是一个传统的字数统计问题。
但是,由于输出值未在 MR 作业中排序(仅对键进行排序),我正在执行另一个 MR 作业,其中映射器将上一个作业的输出作为输入,我使用 InverseMapper.java反转键和值并使用 Identity Reducer(Reducer.java) 因为不需要聚合,我只需要对键(即第一个作业的值)进行排序。这是我的代码:
package edu.pitt.cloud.CloudProject;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable.DecreasingComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class AccessLogMostHitPath {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String configPath = "/usr/local/hadoop-2.7.3/etc/hadoop/";
Path inputPath = new Path(args[0]);
Path outputPath = new Path(args[1]);
Path finalOutputPath = new Path(args[2]);
Configuration config = new Configuration(true);
config.addResource(new Path(configPath+"hdfs-site.xml"));
config.addResource(new Path(configPath+"core-site.xml"));
config.addResource(new Path(configPath+"yarn-site.xml"));
config.addResource(new Path(configPath+"mapred-site.xml"));
Job job = Job.getInstance(config, "AccessLogMostHitPath");
job.setJarByClass(AccessLogMostHitPath.class);
job.setMapperClass(AccessLogMostHitPathMapper.class);
job.setReducerClass(AccessLogMostHitPathReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
config.set("mapreduce.job.running.map.limit", "2");
FileInputFormat.addInputPath(job, inputPath);
job.setInputFormatClass(TextInputFormat.class);
FileOutputFormat.setOutputPath(job, outputPath);
job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks(1);
System.out.println("Starting Job Execution ::: AccessLogMostHitPath");
int code = job.waitForCompletion(true) ? 0 : 1;
System.out.println("Job Execution Finished ::: AccessLogMostHitPath");
if(code != 0){
System.out.println("First Job Failed");
System.exit(code);
}
FileSystem hdfs = FileSystem.get(config);
Path successPath = new Path(outputPath+"/_SUCCESS");
if (hdfs.exists(successPath))
hdfs.delete(successPath, true);
Job job2 = Job.getInstance(config, "AccessLogMostHitPathSort");
job2.setJarByClass(AccessLogMostHitPath.class);
job2.setMapperClass(InverseMapper.class);
job2.setReducerClass(Reducer.class);
//config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "\t");
KeyValueTextInputFormat.addInputPath(job2, outputPath);
job2.setInputFormatClass(KeyValueTextInputFormat.class);
FileOutputFormat.setOutputPath(job2, finalOutputPath);
job2.setOutputFormatClass(TextOutputFormat.class);
job2.setNumReduceTasks(1);
job2.setMapOutputKeyClass(IntWritable.class);
job2.setMapOutputValueClass(Text.class);
job2.setSortComparatorClass(DecreasingComparator.class);
job2.setOutputKeyClass(IntWritable.class);
job2.setOutputValueClass(Text.class);
config.set("mapreduce.job.running.map.limit", "2");
System.out.println("Starting Job Execution ::: AccessLogMostHitPathSort");
int code2 = job2.waitForCompletion(true) ? 0 : 1;
System.out.println("Job Execution Finished ::: AccessLogMostHitPathSort");
System.exit(code2);
}
}
执行此操作时出现以下异常:
Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.IntWritable, received org.apache.hadoop.io.Text
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1072)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
at org.apache.hadoop.mapreduce.lib.map.InverseMapper.map(InverseMapper.java:36)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
这是哪里出错了?。我可以看到某个地方的键或值类型不匹配,但我已经交叉检查了所有内容。请帮助。
问题是KeyValueTextInputFormat。这是文本输入格式,它读取键为 Text
,值为 Text
。但是您声明 job2 输出映射器类型是 IntWritable
和 Text
:
job2.setMapOutputKeyClass(IntWritable.class);
job2.setMapOutputValueClass(Text.class);
所以你必须提供你自己的输入格式才能正确读取输入。
所以我有一个巨大的访问日志文件,我正试图在服务器上找到命中率最高的路径。查找一条路径被命中的次数是一个传统的字数统计问题。
但是,由于输出值未在 MR 作业中排序(仅对键进行排序),我正在执行另一个 MR 作业,其中映射器将上一个作业的输出作为输入,我使用 InverseMapper.java反转键和值并使用 Identity Reducer(Reducer.java) 因为不需要聚合,我只需要对键(即第一个作业的值)进行排序。这是我的代码:
package edu.pitt.cloud.CloudProject;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable.DecreasingComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class AccessLogMostHitPath {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String configPath = "/usr/local/hadoop-2.7.3/etc/hadoop/";
Path inputPath = new Path(args[0]);
Path outputPath = new Path(args[1]);
Path finalOutputPath = new Path(args[2]);
Configuration config = new Configuration(true);
config.addResource(new Path(configPath+"hdfs-site.xml"));
config.addResource(new Path(configPath+"core-site.xml"));
config.addResource(new Path(configPath+"yarn-site.xml"));
config.addResource(new Path(configPath+"mapred-site.xml"));
Job job = Job.getInstance(config, "AccessLogMostHitPath");
job.setJarByClass(AccessLogMostHitPath.class);
job.setMapperClass(AccessLogMostHitPathMapper.class);
job.setReducerClass(AccessLogMostHitPathReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
config.set("mapreduce.job.running.map.limit", "2");
FileInputFormat.addInputPath(job, inputPath);
job.setInputFormatClass(TextInputFormat.class);
FileOutputFormat.setOutputPath(job, outputPath);
job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks(1);
System.out.println("Starting Job Execution ::: AccessLogMostHitPath");
int code = job.waitForCompletion(true) ? 0 : 1;
System.out.println("Job Execution Finished ::: AccessLogMostHitPath");
if(code != 0){
System.out.println("First Job Failed");
System.exit(code);
}
FileSystem hdfs = FileSystem.get(config);
Path successPath = new Path(outputPath+"/_SUCCESS");
if (hdfs.exists(successPath))
hdfs.delete(successPath, true);
Job job2 = Job.getInstance(config, "AccessLogMostHitPathSort");
job2.setJarByClass(AccessLogMostHitPath.class);
job2.setMapperClass(InverseMapper.class);
job2.setReducerClass(Reducer.class);
//config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "\t");
KeyValueTextInputFormat.addInputPath(job2, outputPath);
job2.setInputFormatClass(KeyValueTextInputFormat.class);
FileOutputFormat.setOutputPath(job2, finalOutputPath);
job2.setOutputFormatClass(TextOutputFormat.class);
job2.setNumReduceTasks(1);
job2.setMapOutputKeyClass(IntWritable.class);
job2.setMapOutputValueClass(Text.class);
job2.setSortComparatorClass(DecreasingComparator.class);
job2.setOutputKeyClass(IntWritable.class);
job2.setOutputValueClass(Text.class);
config.set("mapreduce.job.running.map.limit", "2");
System.out.println("Starting Job Execution ::: AccessLogMostHitPathSort");
int code2 = job2.waitForCompletion(true) ? 0 : 1;
System.out.println("Job Execution Finished ::: AccessLogMostHitPathSort");
System.exit(code2);
}
}
执行此操作时出现以下异常:
Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.IntWritable, received org.apache.hadoop.io.Text
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1072)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
at org.apache.hadoop.mapreduce.lib.map.InverseMapper.map(InverseMapper.java:36)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
这是哪里出错了?。我可以看到某个地方的键或值类型不匹配,但我已经交叉检查了所有内容。请帮助。
问题是KeyValueTextInputFormat。这是文本输入格式,它读取键为 Text
,值为 Text
。但是您声明 job2 输出映射器类型是 IntWritable
和 Text
:
job2.setMapOutputKeyClass(IntWritable.class);
job2.setMapOutputValueClass(Text.class);
所以你必须提供你自己的输入格式才能正确读取输入。