如何在 MapReduce 作业中调用 Reducer class,以便 reducer 输出中不应该有重复的键 return?

How to invoke Reducer class in MapReduce job, so that no duplicate keys should return from reducer output?

我尝试了下面的 MapReduce 代码,我有一个 Driver、Mapper 和 Reducer。映射器从输入文件中读取每条记录,并根据关键字(航班号、出发地、目的地和机场)找出航班延误的原因。 Reducer 应该迭代每条记录的值,并且应该给出每个键的最大延迟原因,但我在 Reducer 输出中看到具有重复键的记录。我的 Reducer 代码不工作吗?还是代码错了?我的逻辑不应该给出重复的键/记录。

Driver Class:

package com.airlines.driver;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.airlines.reducer.MainReasonForDelayReducer;
import com.arilines.mapper.MainReasonForDelayMapper;


public class MainReasonForDelayDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration c = new Configuration();
        Path input = new Path(args[0]);
        Path output = new Path(args[1]);
        FileSystem fs = FileSystem.get(new Configuration());
        // true stands for recursively deleting the folder you gave
        fs.delete(output, true);
        Job job = Job.getInstance(c, "Airliines - main reason for delay");
        job.setJarByClass(MainReasonForDelayDriver.class);
        job.setMapperClass(MainReasonForDelayMapper.class);
        job.setReducerClass(MainReasonForDelayReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, input);
        FileOutputFormat.setOutputPath(job, output);
        int result = job.waitForCompletion(true) ? 0 : 1;
        // Print a Message with the Job Status
        if (result == 0) {
            System.out.println("-----JOB SUCESS-----");
        } else {
            System.out.println("--------- JOB FAILED -----------");
        }
    }

}

映射器Class:

package com.arilines.mapper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MainReasonForDelayMapper extends Mapper<LongWritable, Text, Text, Text> {
    public static final int flight_Num = 9;
    public static final int origin = 16;
    public static final int destination = 17;
    public static final int airport = 31;
    public static final int carrierDelay = 24;
    public static final int weatherDelay = 25;
    public static final int NASDelay = 26;
    public static final int securityDelay = 27;
    public static final int lateAircraftDelay = 28;
    public static final int sumOfDelays = 29;

    public void map(LongWritable key, Text value, Context con) throws IOException, InterruptedException {
        String outline = "";
        String line = value.toString();
        String[] words = line.split(",");
        Map<String, Integer> delayValues = new HashMap<>();
        delayValues.put("carrierDelay", Integer.parseInt(words[carrierDelay]));
        delayValues.put("weatherDelay", Integer.parseInt(words[weatherDelay]));
        delayValues.put("NASDelay", Integer.parseInt(words[NASDelay]));
        delayValues.put("securityDelay", Integer.parseInt(words[securityDelay]));
        delayValues.put("lateAircraftDelay", Integer.parseInt(words[lateAircraftDelay]));
        int max = 0;
        List<String> keys = new ArrayList<>();
        keys.addAll(delayValues.keySet());

        for (int i = 0; i < delayValues.size(); i++) {
            if (delayValues.get(keys.get(i)) >= max) {
                max = delayValues.get(keys.get(i));
            }
        }
        String delayReason = "no delay";
        if (max != 0) {
            delayReason = (String) getKeyFromValue(delayValues, max);
        }
        outline = max + "," + delayReason;
        Text outlinekey = new Text(
                words[flight_Num] + "," + words[origin] + "," + words[destination] + "," + words[airport]);
        con.write(outlinekey, new Text(outline));
    }

    public static Object getKeyFromValue(Map hm, Object value) {
        for (Object o : hm.keySet()) {
            if (hm.get(o).equals(value)) {
                return o;
            }
        }
        return null;
    }

}

减速机Class:

package com.airlines.reducer;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MainReasonForDelayReducer extends Reducer<Text, Text, Text, Text> {

    public void reducer(Text key, Iterator<Text> value, Context con) throws IOException, InterruptedException {
        String outline = "";
        int maxDelay = 0;
        String delayReason = "no delay";
        System.out.println(key + "reducer values.... " + value);
        while (value.hasNext()) {
            String tokens[] = value.next().toString().split(",");
            if (Integer.valueOf(tokens[0]) > maxDelay) {
                maxDelay = Integer.valueOf(tokens[0]);
                delayReason = tokens[1];
            }
        }

        outline = maxDelay + "," + delayReason;

        con.write(key, new Text(outline));
    }

}

示例输出数据:

3866,ABI,DFW,Abilene Regional   0,no delay
3866,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay

示例键和值:

Key - 3892,ABI,DFW,Abilene Regional
Value - 0,no delay

我认为问题可能是您的 reducer 没有正确覆盖 reduce 方法。

public void reducer(Text key, Iterator<Text> value, Context con) throws IOException, InterruptedException {
        String outline = "";

正确的覆盖方法是

@Override
public void reduce(Text key, Iterable<Text> iterable_values, Context context) throws IOException , InterruptedException {

请注意关键字 reduce 而不是您所写的 reducer。 另请注意,对于您正在使用的 Mapreduce API 的新版本,它是 Iterable<Text> 而不是 Iterator<Text>

Iterator<Text> 适用于 API 的旧版本,它位于 import org.apache.hadoop.mapred.

虽然较新的版本位于 import org.apache.hadoop.mapreduce.