MapReduce代码查找城市的最高温度
MapReduce Code to find the maximum temperature of city
问题陈述:使用 MapReduce 查找每个城市的最高温度
输入:
Kolkata,56
Jaipur,45
Delhi,43
Mumbai,34
Goa,45
Kolkata,35
Jaipur,34
Delhi,32
输出:
Kolkata 56
Jaipur 45
Delhi 43
Mumbai 34
我写了下面的代码:
地图:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class Map
extends Mapper<LongWritable, Text, Text, IntWritable>{
private IntWritable max = new IntWritable();
private Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer line = new StringTokenizer(value.toString(),",\t");
word.set(line.nextToken());
max.set(Integer.parseInt(line.nextToken()));
context.write(word,max);
}
}
减少:
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class Reduce
extends Reducer<Text, IntWritable, Text, IntWritable>{
private int max_temp = Integer.MIN_VALUE;
private int temp = 0;
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
Iterator<IntWritable> itr = values.iterator();
while (itr.hasNext()) {
temp = itr.next().get();
if( temp > max_temp)
{
max_temp = temp;
}
}
context.write(key, new IntWritable(max_temp));
}
}
Driver Class:
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MaxTempDriver {
public static void main(String[] args) throws Exception {
// Create a new job
Job job = new Job();
// Set job name to locate it in the distributed environment
job.setJarByClass(MaxTempDriver.class);
job.setJobName("Max Temperature");
// Set input and output Path, note that we use the default input format
// which is TextInputFormat (each record is a line of input)
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// Set Mapper and Reducer class
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
// Set Output key and value
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
我收到以下错误:
17/06/15 10:44:17 INFO mapred.JobClient: Task Id :
attempt_201706151011_0002_m_000000_1, Status : FAILED
java.util.NoSuchElementException
at java.util.StringTokenizer.nextToken(StringTokenizer.java:349)
at Map.map(Map.java:23)
at Map.map(Map.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
at org.apache.hadoop.mapred.Child.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
atorg.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
如您所见,我在地图函数中得到 java.util.NoSuchElementException。请帮我解决这个异常,并提供修改 map() 代码的建议。
检查下一个token是否存在:
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer line = new StringTokenizer(value.toString(), ",\t");
if (line.countTokens() > 0) {
word.set(line.nextToken());
if (line.hasMoreTokens())
max.set(Integer.parseInt(line.nextToken()));
context.write(word, max);
}
}
当我尝试这个特定的 MapReduce 示例时,我注意到的一件事是,最高值会级联到温度最高的地方之后的所有值。
输出看起来与此类似,
Delhi 43
Goa 45
Jaipur 45
Kolkata 56
Mumbai 56
与此相反,
Delhi 43
Goa 45
Jaipur 45
Kolkata 56
Mumbai 34
可以看到孟买的最后一个值是56度(这是加尔各答的最高温度)
我注意到,这是因为没有为每次调用 reduce 函数重置温度和 max_temperature。
在 Reduce class 的 reduce 函数中添加以下两行,就在 while 循环解决这个问题之前,
temp = 0;
max_temp = Integer.MIN_VALUE;
问题陈述:使用 MapReduce 查找每个城市的最高温度
输入:
Kolkata,56
Jaipur,45
Delhi,43
Mumbai,34
Goa,45
Kolkata,35
Jaipur,34
Delhi,32
输出:
Kolkata 56
Jaipur 45
Delhi 43
Mumbai 34
我写了下面的代码:
地图:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class Map
extends Mapper<LongWritable, Text, Text, IntWritable>{
private IntWritable max = new IntWritable();
private Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer line = new StringTokenizer(value.toString(),",\t");
word.set(line.nextToken());
max.set(Integer.parseInt(line.nextToken()));
context.write(word,max);
}
}
减少:
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class Reduce
extends Reducer<Text, IntWritable, Text, IntWritable>{
private int max_temp = Integer.MIN_VALUE;
private int temp = 0;
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
Iterator<IntWritable> itr = values.iterator();
while (itr.hasNext()) {
temp = itr.next().get();
if( temp > max_temp)
{
max_temp = temp;
}
}
context.write(key, new IntWritable(max_temp));
}
}
Driver Class:
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MaxTempDriver {
public static void main(String[] args) throws Exception {
// Create a new job
Job job = new Job();
// Set job name to locate it in the distributed environment
job.setJarByClass(MaxTempDriver.class);
job.setJobName("Max Temperature");
// Set input and output Path, note that we use the default input format
// which is TextInputFormat (each record is a line of input)
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// Set Mapper and Reducer class
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
// Set Output key and value
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
我收到以下错误:
17/06/15 10:44:17 INFO mapred.JobClient: Task Id :
attempt_201706151011_0002_m_000000_1, Status : FAILED
java.util.NoSuchElementException
at java.util.StringTokenizer.nextToken(StringTokenizer.java:349)
at Map.map(Map.java:23)
at Map.map(Map.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
at org.apache.hadoop.mapred.Child.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
atorg.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
如您所见,我在地图函数中得到 java.util.NoSuchElementException。请帮我解决这个异常,并提供修改 map() 代码的建议。
检查下一个token是否存在:
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer line = new StringTokenizer(value.toString(), ",\t");
if (line.countTokens() > 0) {
word.set(line.nextToken());
if (line.hasMoreTokens())
max.set(Integer.parseInt(line.nextToken()));
context.write(word, max);
}
}
当我尝试这个特定的 MapReduce 示例时,我注意到的一件事是,最高值会级联到温度最高的地方之后的所有值。
输出看起来与此类似,
Delhi 43
Goa 45
Jaipur 45
Kolkata 56
Mumbai 56
与此相反,
Delhi 43
Goa 45
Jaipur 45
Kolkata 56
Mumbai 34
可以看到孟买的最后一个值是56度(这是加尔各答的最高温度)
我注意到,这是因为没有为每次调用 reduce 函数重置温度和 max_temperature。
在 Reduce class 的 reduce 函数中添加以下两行,就在 while 循环解决这个问题之前,
temp = 0;
max_temp = Integer.MIN_VALUE;