MapReduce 任务立即从 0% 跳到 100%,没有输出
MapReduce Task jumping from 0% to 100% right away with no output
我正在尝试实现一个基本的 MapReduce Java 程序以从给定的数据集中找出最高温度。我的地图显示为以增量步骤执行(0%、10%、18%、27%、... 100%),但我的 Reducer 任务直接从 0% 跳到 100%。你注意到我下面的代码有什么问题吗?
预期输出是气象站代码和最高温度
示例数据:
格式:WeatherStationCode,YYYYMMDD,ELEMENT,TEMP(乘以 10),....
CA004018880,20010101,TMIN,-220,,,C,
MaxTemperatureMapper.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] dataArray = value.toString().split(",");
String stationCode = dataArray[0];
String year = dataArray[1].substring(0, 4);
String element = dataArray[2];
int temperature;
String qualityFlag = dataArray[5];
String sourceFlag = dataArray[6];
if(!dataArray[3].isEmpty()) {
temperature = Integer.parseInt(dataArray[3]);
} else {
temperature = 0;
}
System.out.println(temperature + "-temperature|");
System.out.println(stationCode + "-stationCode|");
if(qualityFlag.isEmpty() && sourceFlag.isEmpty() && element.equalsIgnoreCase("TMAX")) {
context.write(new Text(year), new IntWritable(temperature));
}
}
}
MaxTemperatureReducer.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int maxTemp = Integer.MIN_VALUE;
for(IntWritable value: values) {
maxTemp = Math.max(maxTemp, value.get());
}
context.write(new Text(key), new IntWritable(maxTemp));
}
}
MaxYearTemperature.java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class MaxYearTemperature {
public static void main(String [] args) throws Exception {
System.out.println(args);
Configuration conf = new Configuration();
String[] programArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(programArgs.length != 2) {
System.err.println("Usage: MaxTemp <in> <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, " Max Temp");
job.setJarByClass(MaxYearTemperature.class);
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(programArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(programArgs[1]));
//Submit the job and wait for it to finish
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
我想通了,
实际上与从 0-100% 跳跃无关,
我从拆分数组中读取的数组值之一在数据集中始终为空,并且控件永远不会进入 if 条件以设置上下文。
一旦我纠正,它就开始工作了。
谢谢你的时间。
我正在尝试实现一个基本的 MapReduce Java 程序以从给定的数据集中找出最高温度。我的地图显示为以增量步骤执行(0%、10%、18%、27%、... 100%),但我的 Reducer 任务直接从 0% 跳到 100%。你注意到我下面的代码有什么问题吗? 预期输出是气象站代码和最高温度
示例数据: 格式:WeatherStationCode,YYYYMMDD,ELEMENT,TEMP(乘以 10),....
CA004018880,20010101,TMIN,-220,,,C,
MaxTemperatureMapper.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] dataArray = value.toString().split(",");
String stationCode = dataArray[0];
String year = dataArray[1].substring(0, 4);
String element = dataArray[2];
int temperature;
String qualityFlag = dataArray[5];
String sourceFlag = dataArray[6];
if(!dataArray[3].isEmpty()) {
temperature = Integer.parseInt(dataArray[3]);
} else {
temperature = 0;
}
System.out.println(temperature + "-temperature|");
System.out.println(stationCode + "-stationCode|");
if(qualityFlag.isEmpty() && sourceFlag.isEmpty() && element.equalsIgnoreCase("TMAX")) {
context.write(new Text(year), new IntWritable(temperature));
}
}
}
MaxTemperatureReducer.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int maxTemp = Integer.MIN_VALUE;
for(IntWritable value: values) {
maxTemp = Math.max(maxTemp, value.get());
}
context.write(new Text(key), new IntWritable(maxTemp));
}
}
MaxYearTemperature.java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class MaxYearTemperature {
public static void main(String [] args) throws Exception {
System.out.println(args);
Configuration conf = new Configuration();
String[] programArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(programArgs.length != 2) {
System.err.println("Usage: MaxTemp <in> <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, " Max Temp");
job.setJarByClass(MaxYearTemperature.class);
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(programArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(programArgs[1]));
//Submit the job and wait for it to finish
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
我想通了, 实际上与从 0-100% 跳跃无关, 我从拆分数组中读取的数组值之一在数据集中始终为空,并且控件永远不会进入 if 条件以设置上下文。 一旦我纠正,它就开始工作了。 谢谢你的时间。