MapReduce 中的奇怪行为,值被覆盖

Weird behaviour in MapReduce, values get overwritten

我一直在尝试在 Hadoop 中使用 MapReduce 实现 TfIdf 算法。我的 TFIDF 分 4 个步骤进行(我称它们为 MR1、MR2、MR3、MR4)。这是我的 input/outputs:

MR1: (offset, line) ==(Map)==> (word|file, 1) ==(Reduce)==> (word|file, n)

MR2: (word|file, n) ==(Map)==> (file, word|n) ==(Reduce)==> (word|file, n|N)

MR3: (word|file, n|N) ==(Map)==> (word, file|n|N|1) ==(Reduce)==> (word|file, n|N|M)

MR4: (word|file, n|N|M) ==(Map)==> (word|file, n/N log D/M)

其中 n =(单词,文件)不同对的数量,N = 每个文件中的单词数,M = 每个单词出现的文档数,D = 文档数。

从 MR1 阶段开始,我得到了正确的输出,例如:hello|hdfs://..... 2

对于 MR2 阶段,我预计:hello|hdfs://....... 2|192 但我得到 2|hello|hdfs://...... 192|192

我很确定我的代码是正确的,每次我尝试在减少阶段向我的“值”添加一个字符串以查看发生了什么时,相同的字符串在关键部分被“传送”。

示例:gg|word|hdfs://.... gg|192

这是我的 MR1 代码:

public class MR1 {
    /* Classe Map :
    *  Entree : (offset, line)
    *  Sortie : (word|file, 1)
    *  Sends 1 for each word per line.
    */
    static class MR1Mapper extends Mapper <LongWritable, Text, Text, IntWritable > {
        public void map (LongWritable key, Text value, Context contexte)
                       throws IOException, InterruptedException {
            // Recuperation du nom du fichier associe au "split" 
            FileSplit split = (FileSplit) contexte.getInputSplit();
            String fileName = split.getPath().toString();
    
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line, "' \t:,;:!?./-_()[]{}\"&%<>");
            while (tokenizer.hasMoreTokens()) {
                String word = tokenizer.nextToken().toLowerCase();
                contexte.write(new Text(word + "|" + fileName), new IntWritable(1));
            }
        }
    } 
    
    /* Class Reducer : compte le nombre d'occurrence total par mot/fichier
    * Entree : (word|file, x)
    * Sortie : (word|file, n)
    */
    public static class MR1Reducer extends Reducer <Text, IntWritable, Text, IntWritable > {
        public void reduce(Text key, Iterable < IntWritable > values, Context contexte) 
                    throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val:values) {
                sum += val.get();
            } 
            contexte.write(key, new IntWritable(sum));
        }
    } 
    
    public static void main(String args[]) throws Exception {
        if (args.length != 2) {
            System.err.println(args.length + "(" + args[0] + "," + args[1] + ")");
            System.err.  println("Usage : MR1 <source> <destination>");
            System.exit(-1);
        }
    
        Job job = new Job();
        job.setJarByClass(MR1.class);
    
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
        job.setMapperClass(MR1Mapper.class);
        job.setCombinerClass (MR1Reducer.class) ;
        job.setReducerClass(MR1Reducer.class);
    
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
    
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
    
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

这是我的 MR2 代码:

public class MR2 {
    /* Map : on isole le nom du fichier.
    * Entree : (word|file, n)
    * Sortie : (file, word|n)
    */
    static class MR2Mapper extends Mapper <Text, Text, Text, Text> {
        public void map (Text key, Text value, Context contexte) 
                    throws IOException, InterruptedException {
            String skey = key.toString () ;
            String word = skey.substring (0, skey.indexOf ("|")) ;
            String fileName = skey.substring (skey.indexOf ("|")+1) ;
            contexte.write (new Text (fileName), new Text (word + "|" + value)) ;
        }
    }
    
    /* Reduce : on somme le nombre d'occurence de chaque mot du fichier
    * Entree : (file, word|n)
    * Sortie : (word|file, n|N)
    */
    public static class MR2Reducer extends Reducer <Text, Text, Text, Text> {
        public void reduce (Text key, Iterable <Text> values, Context contexte) 
                    throws IOException, InterruptedException {
            int N = 0 ;
    
            // les iterateurs sont utilisable qu'une seule fois. Donc il faut
            // garder les valeurs dans une arraylist pour les reparcourir.
            ArrayList <String> altmp = new ArrayList <String> () ;
        
            // 1ere boucle : calcul de la somme totale des mots
            for (Text val : values) {
                String sval = val.toString () ;
                String sn = sval.substring (sval.indexOf ("|")+1) ;
                int n = Integer.parseInt (sn) ;
            
                altmp.add (val.toString ()) ;
                N += n ;
            }
    
            // 2eme boucle : calcul de la somme totale des mots
            Iterator <String> it = altmp.iterator () ;
            while (it.hasNext ()) {
                String val = it.next () ;
                String sval = val.toString () ;
                String word = sval.substring (0, sval.indexOf ("|")) ;
                String sn = sval.substring (sval.indexOf ("|")+1) ;
                int n = Integer.parseInt (sn) ;
            
                // I tried to replace n with "gg" here, still same teleporting issue
                contexte.write (new Text (word + "|" + clef.toString ()), new Text (n + "|" + N)) ; 
            }
        }
    }
    
    public static void main (String args []) throws Exception {
        if (args.length != 2) {
            System.err.println (args.length + "("+args [0] + "," +args [1] + ")") ;
            System.err.println ("Usage : MR2 <source> <destination>") ;
            System.exit (-1) ;
        }
    
        Job job = new Job () ;
        job.setJarByClass (MR2.class) ;
    
        // Le fichier HDFS a utiliser en entree
        FileInputFormat.addInputPath (job, new Path (args [0])) ;
        FileOutputFormat.setOutputPath (job, new Path (args [1])) ;
    
        job.setInputFormatClass(KeyValueTextInputFormat.class);
    
        job.setMapperClass (MR2Mapper.class) ;
        job.setCombinerClass (MR2Reducer.class) ;
        job.setReducerClass (MR2Reducer.class) ;
    
        job.setMapOutputKeyClass (Text.class) ;
        job.setMapOutputValueClass (Text.class) ;
    
        job.setOutputKeyClass (Text.class) ;
        job.setOutputValueClass (Text.class) ;
    
        System.exit (job.waitForCompletion (true) ? 0 : 1) ;
    }
}

如有任何帮助,我们将不胜感激。

这是 Combiner 的错。您在驱动程序 class 中指定要在以下命令中将 MR2Reducer 用作组合器和减速器:

job.setCombinerClass (MR2Reducer.class) ;
job.setReducerClass (MR2Reducer.class) ;

但是,一个Combiner是运行在一个Map个instance的范围内,而一个Reducer是在所有Mappers执行完之后串联运行的。通过使用 Combiner,您实际上是在执行每个单独的 Mapper 任务后立即执行 MR2Reducer,因此它计算 N 并在每个 Mapper 任务中拆分给定键值输入的复合值范围。

这基本上导致 Reduce 阶段通过输入 (word|file, n|N) 键值对模式(也就是 MR2Reducer 任务的输出 Reduce 阶段之前)而不是所需的 (file, word|n) 模式。通过在不知不觉中使用错误模式,您错误地拆分了复合值,并且输出的键值对看起来不稳定,错误,and/or 反转。

要解决此问题,您可以:

  • 创建一个自定义 Combiner,它将具有与当前 MR2Reducer 相同的命令,然后更改 MR2Reducer class 以接收 [=17= 中的键值对] 模式(不推荐,因为它可能会抵消可伸缩性和执行时间方面的所有好处,并且只会使您的 MapReduce 作业变得更加复杂),或
  • 从您的驱动程序 class 中删除或注释掉 job.setCombinerClass (MR2Reducer.class) ; 行以保持简单和实用,以便您将来可以从那里构建。

为了展示这一点,我在我的机器上本地使用了你的 MR1MR2 classes,删除了 job.setCombinerClass (MR2Reducer.class) ; 行并使用了 this 输入存储在 HDFS 中以验证输出的键值对是否符合要求。这是执行后的输出片段:

balance|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt    1|661
suppress|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt   1|661
back|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt       4|661
after|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt      1|661
suspicious|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 2|661
swang|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt      2|661
swinging|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt   1|661