使用 mapreduce 的第二大薪水 - 输出不符合预期
Second largest salary using mapreduce - Output is not as expected
我编写了一个小型 mapreduce 作业来查找数据集中第二高的薪水。我相信第二高的薪水逻辑是 correct.But 我得到了多个不正确的输出,应该只有一个带有名称的输出,例如 John,9000.And 输出也不正确,我在这里给出数据集和代码
hh,0,Jeet,3000
hk,1,Mayukh,4000
nn,2,Antara,3500
mm,3,Shubu,6000
ii,4,Parsi,8000
输出应该是 Shubu,6000,但我得到的是下面的输出
Antara -2147483648
Mayukh -2147483648
Parsi 3500
Shubu 4000
我使用的代码是
public class SecondHigestMapper extends Mapper<LongWritable,Text,Text,Text>{
private Text salary = new Text();
private Text name = new Text();
public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
if(key.get()!=0){
String split[]= value.toString().split(",");
salary.set(split[2]+";"+split[3]);
name.set("ignore");
context.write(name,salary);
}
}
}
public class SecondHigestReducer extends Reducer<Text,Text,Text,IntWritable>{
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
int highest = 0;
int second_highest = 0;
int salary;
for(Text val:values){
String[] fn = val.toString().split("\;");
salary = Integer.parseInt(fn[3]);
if(highest < salary){
second_highest = highest;
highest =salary;
} else if(second_highest < salary){
second_highest = salary;
}
}
String seconHigest = String.valueOf(second_highest);
context.write(new Text(key),new Text(seconHigest));
}
}
public class SecondHigestDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = new Job(conf,"Second Higest Sal");
job.setJarByClass(SecondHigestDriver.class);
job.setMapperClass(SecondHigestMapper.class);
job.setCombinerClass(SecondHigestReducer.class);
job.setReducerClass(SecondHigestReducer.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
我低于异常
Error: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.IntWritable, received org.apache.hadoop.io.Text
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1074)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:712)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
at com.jeet.secondhigest.SecondHigestMapper.map(SecondHigestMapper.java:20)
at com.jeet.secondhigest.SecondHigestMapper.map(SecondHigestMapper.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)
请帮我解决这个问题
使用一个键将所有薪水强制放入一个 reducer
name.set("ignore"); // Could use a NullWritable
salary.set(split[2]+";"+split[3])); // change to TextWritable
context.write(name,salary); // need to change the signature of the mapper class
然后在 reducer 中,改变接受文本值的方法,然后将它们分开,投射工资,然后比较那些
我编写了一个小型 mapreduce 作业来查找数据集中第二高的薪水。我相信第二高的薪水逻辑是 correct.But 我得到了多个不正确的输出,应该只有一个带有名称的输出,例如 John,9000.And 输出也不正确,我在这里给出数据集和代码
hh,0,Jeet,3000
hk,1,Mayukh,4000
nn,2,Antara,3500
mm,3,Shubu,6000
ii,4,Parsi,8000
输出应该是 Shubu,6000,但我得到的是下面的输出
Antara -2147483648
Mayukh -2147483648
Parsi 3500
Shubu 4000
我使用的代码是
public class SecondHigestMapper extends Mapper<LongWritable,Text,Text,Text>{
private Text salary = new Text();
private Text name = new Text();
public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
if(key.get()!=0){
String split[]= value.toString().split(",");
salary.set(split[2]+";"+split[3]);
name.set("ignore");
context.write(name,salary);
}
}
}
public class SecondHigestReducer extends Reducer<Text,Text,Text,IntWritable>{
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
int highest = 0;
int second_highest = 0;
int salary;
for(Text val:values){
String[] fn = val.toString().split("\;");
salary = Integer.parseInt(fn[3]);
if(highest < salary){
second_highest = highest;
highest =salary;
} else if(second_highest < salary){
second_highest = salary;
}
}
String seconHigest = String.valueOf(second_highest);
context.write(new Text(key),new Text(seconHigest));
}
}
public class SecondHigestDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = new Job(conf,"Second Higest Sal");
job.setJarByClass(SecondHigestDriver.class);
job.setMapperClass(SecondHigestMapper.class);
job.setCombinerClass(SecondHigestReducer.class);
job.setReducerClass(SecondHigestReducer.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
我低于异常
Error: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.IntWritable, received org.apache.hadoop.io.Text
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1074)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:712)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
at com.jeet.secondhigest.SecondHigestMapper.map(SecondHigestMapper.java:20)
at com.jeet.secondhigest.SecondHigestMapper.map(SecondHigestMapper.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)
请帮我解决这个问题
使用一个键将所有薪水强制放入一个 reducer
name.set("ignore"); // Could use a NullWritable
salary.set(split[2]+";"+split[3])); // change to TextWritable
context.write(name,salary); // need to change the signature of the mapper class
然后在 reducer 中,改变接受文本值的方法,然后将它们分开,投射工资,然后比较那些