mapReduce 以获得所需的输出

mapReduce to get desired output

请为我指明方向以获得我想要的输出

当前输出给定:

阿尔巴尼亚 3607 ++ 国家/地区最小人口

阿尔巴尼亚 418495 ++ 国家最大人口

期望输出

国家/地区城市最小人口

国家城市最大人口

减速机Class:

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class Handson3Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override

    public void reduce(Text key, Iterable<IntWritable> values,  Context context)  throws IOException, InterruptedException {
        int maxValue = Integer.MIN_VALUE;
        int minValue = Integer.MAX_VALUE;
        String line = key.toString();
        String field[] = line.split(",");
        for (IntWritable value : values) {

            maxValue = Math.max(maxValue, value.get());
            minValue = Math.min(minValue, value.get());

        }
        context.write(key, new IntWritable(minValue));
        context.write(key, new IntWritable(maxValue));
    }

}

映射器class:

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class handson3Mapper extends  Mapper<LongWritable, Text, Text, IntWritable> {

    private static final int MISSING = 9999;
        
    @Override
    
     public void map(LongWritable key, Text value, Context context)  throws IOException, InterruptedException {
        
        int populationVal;
        String line = value.toString();
        String field[] = line.split(",");
        String country = field[4].substring(1, field[4].length()-1);
        String newString = country.concat(field[0].substring(1, field[0].length()-1));
        
        String population = field[9].substring(1, field[9].length()-1);
        String city = field[0].substring(1, field[0].length()-1);

        
        if (!population.matches(".*\d.*") || population.equals("")||
                population.matches("([0-9].*)\.([0-9].*)") ){
                return;
            }else{
                populationVal = Integer.parseInt(population);
                context.write(new Text(country),new IntWritable(populationVal));
            }
        }
    
    }

亚军Class:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class handsonJobRunner {
    public int run(String[] args) throws Exception  {

        if(args.length !=2) {

            System.err.println("Usage: Handson3 <input path> <outputpath>");
            System.exit(-1);

        }
Job job = new Job();
        
        job.setJarByClass(handsonJobRunner.class);

        job.setJobName("Handson 3");

        FileInputFormat.addInputPath(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        job.setMapperClass(handson3Mapper.class);

        job.setReducerClass(Handson3Reducer.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

        System.exit(job.waitForCompletion(true) ? 0:1);

        boolean success = job.waitForCompletion(true);

        return success ? 0 : 1;

    }

    public static void main(String[] args) throws Exception {
        handsonJobRunner driver = new handsonJobRunner();
        driver.run(args);


    }

}

提前致谢,如有指点,将不胜感激。

您应该将城市和人口作为价值发送给减速器,并在减速器 select 每个国家/地区具有最大和最小人口的城市。

你的映射器应该是这样的:

public class Handson3Mapper extends Mapper<LongWritable, Text, Text, Text> {

    private static final int MISSING = 9999;

    @Override

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        int populationVal;
        String line = value.toString();
        String field[] = line.split(",");
        String country = field[4].substring(1, field[4].length() - 1);
        String newString = country.concat(field[0].substring(1, field[0].length() - 1));

        String population = field[9].substring(1, field[9].length() - 1);
        String city = field[0].substring(1, field[0].length() - 1);


        if (!population.matches(".*\d.*") || population.equals("") ||
                population.matches("([0-9].*)\.([0-9].*)")) {
            return;
        } else {
            populationVal = Integer.parseInt(population);
            context.write(new Text(country), new Text(city + "-" + populationVal));
        }
    }

}

你的减速器应该换成这个:

public class Handson3Reducer extends Reducer<Text, Text, Text, IntWritable> {

    @Override

    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        String maxPopulationCityName = "";
        String minPopulationCityName = "";
        int maxValue = Integer.MIN_VALUE;
        int minValue = Integer.MAX_VALUE;
        String line = key.toString();
        String field[] = line.split(",");
        for (IntWritable value : values) {
            String[] array = value.toString().split("-");
            int population = Integer.valueOf(array[1]);
            if (population > maxValue) {
                maxPopulationCityName = array[0];
                maxValue = population;
            }
            if (population < minValue) {
                minPopulationCityName = array[0];
                minValue = population;
            }

        }
        context.write(new Text(key + " " + minPopulationCityName), new IntWritable(minValue));
        context.write(new Text(key + " " + maxPopulationCityName), new IntWritable(maxValue));
    }

}