Hadoop Reducer 不工作
Hadoop Reducer does not work
我在处理 MapReduce 作业时遇到问题。我的 map
函数执行 运行 并产生所需的输出。但是,reduce
函数不会 运行。似乎该函数永远不会被调用。我使用 Text 作为键,使用 Text 作为值。但我不认为这会导致问题。
输入文件格式如下:
2015-06-06,2015-06-06,40.80239868164062,-73.93379211425781,40.72591781616211,-73.98358154296875,7.71,35.72
2015-06-06,2015-06-06,40.71020126342773,-73.96302032470703,40.72967529296875,-74.00226593017578,3.11,2.19
2015-06-05,2015-06-05,40.68404388427734,-73.97597503662109,40.67932510375977,-73.95581817626953,1.13,1.29
...
我想提取一行的第二个日期作为 Text
并将其用作 reduce 的键。键的值将是同一行中最后两个 float
值的组合。
即:2015-06-06 7.71 35.72
2015-06-06 9.71 66.72
这样值部分就可以看成两列用空格隔开。
这确实有效,我得到了一个包含许多相同键但不同值的输出文件。
现在我想对每个键的两个浮点列求和,这样在 reduce 之后我得到一个日期作为键,将求和的列作为值。
问题:reduce 没有 运行。
查看下面的代码:
映射器
public class Aggregate {
public static class EarnDistMapper extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String [] splitResult = value.toString().split(",");
String dropOffDate = "";
String compEarningDist = "";
//dropoffDate at pos 1 as key
dropOffDate = splitResult[1];
//distance at pos length-2 and earnings at pos length-1 as values separated by space
compEarningDist = splitResult[splitResult.length -2] + " " + splitResult[splitResult.length-1];
context.write(new Text(dropOffDate), new Text(compEarningDist));
}
}
减速器
public static class EarnDistReducer extends Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterator<Text> values, Context context) throws IOException, InterruptedException {
float sumDistance = 0;
float sumEarnings = 0;
String[] splitArray;
while (values.hasNext()){
splitArray = values.next().toString().split("\s+");
//distance first
sumDistance += Float.parseFloat(splitArray[0]);
sumEarnings += Float.parseFloat(splitArray[1]);
}
//combine result to text
context.write(key, new Text(Float.toString(sumDistance) + " " + Float.toString(sumEarnings)));
}
}
工作
public static void main(String[] args) throws Exception{
// TODO
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Taxi dropoff");
job.setJarByClass(Aggregate.class);
job.setMapperClass(EarnDistMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setCombinerClass(EarnDistReducer.class);
job.setReducerClass(EarnDistReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
感谢您的帮助!!
您 reduce
方法的签名有误。你有:
public void reduce(Text key, Iterator<Text> values, Context context) {
应该是:
public void reduce(Text key, Iterable<Text> values, Context context) {
我在处理 MapReduce 作业时遇到问题。我的 map
函数执行 运行 并产生所需的输出。但是,reduce
函数不会 运行。似乎该函数永远不会被调用。我使用 Text 作为键,使用 Text 作为值。但我不认为这会导致问题。
输入文件格式如下:
2015-06-06,2015-06-06,40.80239868164062,-73.93379211425781,40.72591781616211,-73.98358154296875,7.71,35.72
2015-06-06,2015-06-06,40.71020126342773,-73.96302032470703,40.72967529296875,-74.00226593017578,3.11,2.19
2015-06-05,2015-06-05,40.68404388427734,-73.97597503662109,40.67932510375977,-73.95581817626953,1.13,1.29
...
我想提取一行的第二个日期作为 Text
并将其用作 reduce 的键。键的值将是同一行中最后两个 float
值的组合。
即:2015-06-06 7.71 35.72
2015-06-06 9.71 66.72
这样值部分就可以看成两列用空格隔开。
这确实有效,我得到了一个包含许多相同键但不同值的输出文件。
现在我想对每个键的两个浮点列求和,这样在 reduce 之后我得到一个日期作为键,将求和的列作为值。
问题:reduce 没有 运行。
查看下面的代码:
映射器
public class Aggregate {
public static class EarnDistMapper extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String [] splitResult = value.toString().split(",");
String dropOffDate = "";
String compEarningDist = "";
//dropoffDate at pos 1 as key
dropOffDate = splitResult[1];
//distance at pos length-2 and earnings at pos length-1 as values separated by space
compEarningDist = splitResult[splitResult.length -2] + " " + splitResult[splitResult.length-1];
context.write(new Text(dropOffDate), new Text(compEarningDist));
}
}
减速器
public static class EarnDistReducer extends Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterator<Text> values, Context context) throws IOException, InterruptedException {
float sumDistance = 0;
float sumEarnings = 0;
String[] splitArray;
while (values.hasNext()){
splitArray = values.next().toString().split("\s+");
//distance first
sumDistance += Float.parseFloat(splitArray[0]);
sumEarnings += Float.parseFloat(splitArray[1]);
}
//combine result to text
context.write(key, new Text(Float.toString(sumDistance) + " " + Float.toString(sumEarnings)));
}
}
工作
public static void main(String[] args) throws Exception{
// TODO
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Taxi dropoff");
job.setJarByClass(Aggregate.class);
job.setMapperClass(EarnDistMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setCombinerClass(EarnDistReducer.class);
job.setReducerClass(EarnDistReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
感谢您的帮助!!
您 reduce
方法的签名有误。你有:
public void reduce(Text key, Iterator<Text> values, Context context) {
应该是:
public void reduce(Text key, Iterable<Text> values, Context context) {