MapReduce代码查找城市的最高温度

MapReduce Code to find the maximum temperature of city

问题陈述:使用 MapReduce 查找每个城市的最高温度

输入:

Kolkata,56
Jaipur,45
Delhi,43
Mumbai,34
Goa,45
Kolkata,35
Jaipur,34
Delhi,32

输出:

Kolkata   56
Jaipur    45
Delhi     43
Mumbai    34

我写了下面的代码:

地图:

import java.io.IOException; 
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class Map 
        extends Mapper<LongWritable, Text, Text, IntWritable>{

    private IntWritable max = new IntWritable();
    private Text word = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        StringTokenizer line = new StringTokenizer(value.toString(),",\t");

        word.set(line.nextToken());
        max.set(Integer.parseInt(line.nextToken()));

        context.write(word,max);            

        }
    }

减少:

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class Reduce 
        extends Reducer<Text, IntWritable, Text, IntWritable>{

    private int max_temp = Integer.MIN_VALUE;
    private int temp = 0;

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, 
            Context context)
            throws IOException, InterruptedException {

        Iterator<IntWritable> itr = values.iterator();

        while (itr.hasNext()) {

            temp = itr.next().get();
            if( temp > max_temp)
            {
                max_temp = temp;
            }
        }

        context.write(key, new IntWritable(max_temp));
    }
}

Driver Class:

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxTempDriver {
    public static void main(String[] args) throws Exception {        

        // Create a new job
        Job job = new Job();

        // Set job name to locate it in the distributed environment
        job.setJarByClass(MaxTempDriver.class);
        job.setJobName("Max Temperature");

        // Set input and output Path, note that we use the default input format
        // which is TextInputFormat (each record is a line of input)
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // Set Mapper and Reducer class
        job.setMapperClass(Map.class);
        job.setCombinerClass(Reduce.class);
        job.setReducerClass(Reduce.class);

        // Set Output key and value
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

我收到以下错误:

17/06/15 10:44:17 INFO mapred.JobClient: Task Id : 
attempt_201706151011_0002_m_000000_1, Status : FAILED
java.util.NoSuchElementException
at java.util.StringTokenizer.nextToken(StringTokenizer.java:349)
at Map.map(Map.java:23)
at Map.map(Map.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
at org.apache.hadoop.mapred.Child.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
atorg.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
at org.apache.hadoop.mapred.Child.main(Child.java:249)

如您所见,我在地图函数中得到 java.util.NoSuchElementException。请帮我解决这个异常,并提供修改 map() 代码的建议。

检查下一个token是否存在:

@Override
protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {

    StringTokenizer line = new StringTokenizer(value.toString(), ",\t");

    if (line.countTokens() > 0) {
        word.set(line.nextToken());

        if (line.hasMoreTokens())
            max.set(Integer.parseInt(line.nextToken()));

        context.write(word, max);
    }
}

当我尝试这个特定的 MapReduce 示例时,我注意到的一件事是,最高值会级联到温度最高的地方之后的所有值。

输出看起来与此类似,

Delhi   43
Goa     45
Jaipur  45
Kolkata 56
Mumbai  56

与此相反,

Delhi   43
Goa     45
Jaipur  45
Kolkata 56
Mumbai  34

可以看到孟买的最后一个值是56度(这是加尔各答的最高温度)

我注意到,这是因为没有为每次调用 reduce 函数重置温度和 max_temperature。

在 Reduce class 的 reduce 函数中添加以下两行,就在 while 循环解决这个问题之前,

temp = 0;
max_temp = Integer.MIN_VALUE;