Reduce 函数不影响最终输出

Reduce function is not affecting the final output

我从 Mapreduce 代码中得到奇怪的输出:

输入:

aa bb  
aa cc  
bb aa  
cc dd  
dd bb  
xx aa  
ss rr

输出:

aa  org.mapreduce.userscore.UserScore$ScoreWritable@1  
aa  org.mapreduce.userscore.UserScore$ScoreWritable@0  
aa  org.mapreduce.userscore.UserScore$ScoreWritable@1  
aa  org.mapreduce.userscore.UserScore$ScoreWritable@0  
bb  org.mapreduce.userscore.UserScore$ScoreWritable@0  
bb  org.mapreduce.userscore.UserScore$ScoreWritable@0  
bb  org.mapreduce.userscore.UserScore$ScoreWritable@1  
cc  org.mapreduce.userscore.UserScore$ScoreWritable@1  
cc  org.mapreduce.userscore.UserScore$ScoreWritable@0  
dd  org.mapreduce.userscore.UserScore$ScoreWritable@1  
dd  org.mapreduce.userscore.UserScore$ScoreWritable@0  
rr  org.mapreduce.userscore.UserScore$ScoreWritable@0  
ss  org.mapreduce.userscore.UserScore$ScoreWritable@1  
xx  org.mapreduce.userscore.UserScore$ScoreWritable@1  

代码:

package org.mapreduce.userscore;

import java.io.*;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class UserScore  {

 public static class ScoreWritable implements Writable {
            private IntWritable N;
            private IntWritable M;

            //Default Constructor
            public ScoreWritable() {
                this.N = new IntWritable();
                this.M = new IntWritable();
            }

            //Custom constructor
            public ScoreWritable(IntWritable N, IntWritable M){
                this.N = N;
                this.M = M;
            }

            //Setter method to set the values of ScoreWritable objects
            public void set(IntWritable NN,IntWritable MM) {
                this.N = NN;
                this.M = MM;
            }

            //to get the first object from Score Record
            public IntWritable getN() {
                return N;
            }

            //to get the second object from Score Record
            public IntWritable getM() {
                return M;
            }

            @Override
            //overriding default readFields method.
            //It de-serializes the byte stream data
            public void readFields(DataInput in) throws IOException {
                N.readFields(in);
                M.readFields(in);
            }

            @Override
            //It serializes object data into byte stream data
            public void write(DataOutput out) throws IOException {
                N.write(out);
                M.write(out);
            }

            //@Override
            //public boolean equals(Object o) {
                //if (o instanceof ScoreWritable) {
                //ScoreWritable other = (ScoreWritable) o;
                //return N.equals(other.N) && M.equals(other.M);
                //}
                //return false;
            //}

            @Override
            public int hashCode() {
                return N.hashCode();
            }

 }

 public static class Map extends Mapper<LongWritable, Text, Text, ScoreWritable> {
    private Text user = new Text();
    private ScoreWritable score = new ScoreWritable();
    private IntWritable NN = new IntWritable();
    private IntWritable MM = new IntWritable();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        int iterator = 1;
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            user.set(tokenizer.nextToken());
            if (iterator == 1) {
                NN = new IntWritable(1);
                MM = new IntWritable(0);
                iterator += 1;
            } else {
                NN = new IntWritable(0);
                MM = new IntWritable(1);
            }
            score.set(NN,MM);
            context.write(user, score);
        }
    }
 }

 public static class Reduce extends Reducer<Text, ScoreWritable, Text, IntWritable> {
     private IntWritable resultf = new IntWritable();
     public void reduce(Text key, Iterable<ScoreWritable> values, Context context) throws IOException, InterruptedException {
        //int result = ((values.getN().get()) * (values.getM()).get());
        resultf.set(result);
        context.write(key, resultf = new IntWritable(2));
    }
 }

 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();

    //Create a new Jar and set the driver class(this class) as the main class of jar:
    Job job = new Job(conf, "userscore");
    job.setJarByClass(UserScore.class);

    //Set the map and reduce classes in the job:
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setCombinerClass(Reduce.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    //job.setMapOutputKeyClass(Text.class);
    //job.setMapOutputValueClass(ScoreWritable.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    job.setNumReduceTasks(4);

    //Set the input and the output path from the arguments
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    //Run the job and wait for its completion
    System.exit(job.waitForCompletion(true) ? 0 : 1);
 }

}

我正在尝试编写 Mapreduce 代码以从文本文件中读取。文本文件的每一行都有一对字符串,这些字符串代表社交网络中的用户名,其中第一个用户关注第二个用户。我正在尝试计算每个用户的关注者总数和关注的用户名,然后将这两个数字相乘以形成每个用户的某种分数。

想法是为值创建可写自定义 class (ScoreWritable),并将用户名作为文本键传输,将值作为 ScoreWritable class 传输。 如果您注意到我将 Reduce 的输出更改为输出常量“2”,请检查一下,但输出如上所示。

我做错了什么?

我在虚拟机中使用 Cloudera 映像来编译和 运行 jar 文件。

您正在使用 TextOutputFormat,它不知道如何打印(作为文本)您的自定义 ScoreWritable,实际上它只是输出 ScoreWritable 实例的字符串表示形式。 我知道最快的解决方法是覆盖 ScoreWritable 的 toString() 方法,例如

public String toString() {
    return "" + N.get() + "\t" + M.get();
}

或者您可以编写自己的自定义输出格式。例如参见 [​​=12=]

希望这对您有所帮助

所以我设法使代码工作。如您所见,存在一些问题:

  1. 管理自定义内部的数据流 class(我猜),感谢@gtosto 建议使用 ToString()
  2. Reducer 中变量的错误使用。
  3. Reducer 中错误的迭代方法。

我还添加了一个单独的 Combiner class 来优化 Mapper 和 Reducer 之间的网络流。

这是最终代码:(含注释)

package org.mapreduce.userscore;

import java.io.*;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class UserScore  {
    // Defining a custum class that contains two IntWritabe values
    // this custom class will be used to hold the Value part of the key-value pairs between the mapper and reducers

 public static class ScoreWritable implements Writable {
            private IntWritable N;
            private IntWritable M;

            //Default Constructor
            public ScoreWritable() {
                this.N = new IntWritable();
                this.M = new IntWritable();
            }

            //Custom constructor
            public ScoreWritable(IntWritable N, IntWritable M){
                this.N = N;
                this.M = M;
            }

            //Setter method to set the values of ScoreWritable objects
            public void set(IntWritable NN,IntWritable MM) {
                this.N = NN;
                this.M = MM;
            }

            //to get the first object from Score Record
            public IntWritable getN() {
                return N;
            }

            //to get the second object from Score Record
            public IntWritable getM() {
                return M;
            }

            @Override
            //overriding default readFields method.
            //It de-serializes the byte stream data
            public void readFields(DataInput in) throws IOException {
                N.readFields(in);
                M.readFields(in);
            }

            @Override
            //It serializes object data into byte stream data
            public void write(DataOutput out) throws IOException {
                N.write(out);
                M.write(out);
            }

            @Override
            //OrganizING the data stream in this custom class
            public String toString() {
                return "" + N.get() + "\t" + M.get();
            }


            @Override
            public int hashCode() {
                return N.hashCode();
            }

 }

 public static class Map extends Mapper<LongWritable, Text, Text, ScoreWritable> {
    private Text user = new Text();
    private ScoreWritable score = new ScoreWritable();  //variabe sscore will hold the pair (N,M) for eatch user
    private IntWritable NN = new IntWritable();
    private IntWritable MM = new IntWritable();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        int iterator = 1;
        // tokenizing: variable tokenizer will hold the first username then the second username in each ine of the input text file
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            user.set(tokenizer.nextToken());
            if (iterator == 1) {                         // here variabe tokenizer holds the first username
                NN = new IntWritable(1);                 // saying that this user (username1) is folowing ssomeone
                MM = new IntWritable(0);
                iterator += 1;
            } else {                                     // here variabe tokenizer will hold the second username
                NN = new IntWritable(0);
                MM = new IntWritable(1);                 // saying that this user (username2) is being followed by someone
            }
            score.set(NN,MM);                            // giving eiter (1,0) or (0,1) to variable score
            context.write(user, score);                  // assigning variable score for each user in each line
        }   // emitting [Ali, (1,0)] or [Ali, (0,1)] means that Ali is following someone or being followed by someone, respectively.
    }       // next: the Reducer will go through all the values for each key, sum the total internal values of the key.
 }

    public static class Combine extends Reducer<Text, ScoreWritable, Text, ScoreWritable> {
        private IntWritable resultf = new IntWritable();
        private IntWritable NNN = new IntWritable();
        private IntWritable MMM = new IntWritable();
        public void reduce(Text key, Iterable<ScoreWritable> values, Reducer<Text, ScoreWritable, Text, ScoreWritable>.Context context)
                throws IOException, InterruptedException {
            int sum1 = 0;
            int sum2 = 0;
            for (ScoreWritable val:values) {
                sum1 += val.getN().get();
                sum2 += val.getM().get();
            }
            NNN = new IntWritable(sum1);
            MMM = new IntWritable(sum2);
            context.write(key, new ScoreWritable(NNN, MMM));    // this will combine all the values for each key before emitting the new pairs to Reduce function
        }
    }

 public static class Reduce extends Reducer<Text, ScoreWritable, Text, IntWritable> {
     private IntWritable resultf = new IntWritable();
     public void reduce(Text key, Iterable<ScoreWritable> values, Reducer<Text, ScoreWritable, Text, IntWritable>.Context context)
             throws IOException, InterruptedException {
         int sum3 = 0;
         int sum4 = 0;
         for (ScoreWritable val:values) {
             sum3 = val.getN().get();                // if the current user is following 20 people, then Sum3 = 20
             sum4 = val.getM().get();                // if the current user is being followed by 30 people, then Sum4 = 30
         }
         int result = sum3 * sum4;
         resultf.set(result);
         context.write(key, resultf);                // this will emit the current user and his/her corresponding score
    }
 }

 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();

    //Create a new Jar and set the driver class(this class) as the main class of jar:
    Job job = new Job(conf, "userscore");
    job.setJarByClass(UserScore.class);

    //Set the map and reduce classes in the job:
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setCombinerClass(Combine.class);                 //activated unique combiner class which is different than the Reducer's IO is different

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    job.setMapOutputKeyClass(Text.class);                //assigning output class for mapper since it is different than the Reducer's output class
    job.setMapOutputValueClass(ScoreWritable.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    job.setNumReduceTasks(4);                            //assigning 4 reducers

    //Set the input and the output path from the arguments
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    //Run the job and wait for its completion
    System.exit(job.waitForCompletion(true) ? 0 : 1);
 }

}

这是 4 个输出文本文件之一的一部分:

user0   2745
user1001    18724
user1005    2405
user1009    16577
user1012    1710
user1016    10074
user1023    2173
user1027    791