MapReduce 中的奇怪行为,值被覆盖
Weird behaviour in MapReduce, values get overwritten
我一直在尝试在 Hadoop 中使用 MapReduce 实现 TfIdf 算法。我的 TFIDF 分 4 个步骤进行(我称它们为 MR1、MR2、MR3、MR4)。这是我的 input/outputs:
MR1: (offset, line) ==(Map)==> (word|file, 1) ==(Reduce)==> (word|file, n)
MR2: (word|file, n) ==(Map)==> (file, word|n) ==(Reduce)==> (word|file, n|N)
MR3: (word|file, n|N) ==(Map)==> (word, file|n|N|1) ==(Reduce)==> (word|file, n|N|M)
MR4: (word|file, n|N|M) ==(Map)==> (word|file, n/N log D/M)
其中 n =(单词,文件)不同对的数量,N = 每个文件中的单词数,M = 每个单词出现的文档数,D = 文档数。
从 MR1 阶段开始,我得到了正确的输出,例如:hello|hdfs://..... 2
对于 MR2 阶段,我预计:hello|hdfs://....... 2|192
但我得到 2|hello|hdfs://...... 192|192
我很确定我的代码是正确的,每次我尝试在减少阶段向我的“值”添加一个字符串以查看发生了什么时,相同的字符串在关键部分被“传送”。
示例:gg|word|hdfs://.... gg|192
这是我的 MR1 代码:
public class MR1 {
/* Classe Map :
* Entree : (offset, line)
* Sortie : (word|file, 1)
* Sends 1 for each word per line.
*/
static class MR1Mapper extends Mapper <LongWritable, Text, Text, IntWritable > {
public void map (LongWritable key, Text value, Context contexte)
throws IOException, InterruptedException {
// Recuperation du nom du fichier associe au "split"
FileSplit split = (FileSplit) contexte.getInputSplit();
String fileName = split.getPath().toString();
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line, "' \t:,;:!?./-_()[]{}\"&%<>");
while (tokenizer.hasMoreTokens()) {
String word = tokenizer.nextToken().toLowerCase();
contexte.write(new Text(word + "|" + fileName), new IntWritable(1));
}
}
}
/* Class Reducer : compte le nombre d'occurrence total par mot/fichier
* Entree : (word|file, x)
* Sortie : (word|file, n)
*/
public static class MR1Reducer extends Reducer <Text, IntWritable, Text, IntWritable > {
public void reduce(Text key, Iterable < IntWritable > values, Context contexte)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val:values) {
sum += val.get();
}
contexte.write(key, new IntWritable(sum));
}
}
public static void main(String args[]) throws Exception {
if (args.length != 2) {
System.err.println(args.length + "(" + args[0] + "," + args[1] + ")");
System.err. println("Usage : MR1 <source> <destination>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MR1.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MR1Mapper.class);
job.setCombinerClass (MR1Reducer.class) ;
job.setReducerClass(MR1Reducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
这是我的 MR2 代码:
public class MR2 {
/* Map : on isole le nom du fichier.
* Entree : (word|file, n)
* Sortie : (file, word|n)
*/
static class MR2Mapper extends Mapper <Text, Text, Text, Text> {
public void map (Text key, Text value, Context contexte)
throws IOException, InterruptedException {
String skey = key.toString () ;
String word = skey.substring (0, skey.indexOf ("|")) ;
String fileName = skey.substring (skey.indexOf ("|")+1) ;
contexte.write (new Text (fileName), new Text (word + "|" + value)) ;
}
}
/* Reduce : on somme le nombre d'occurence de chaque mot du fichier
* Entree : (file, word|n)
* Sortie : (word|file, n|N)
*/
public static class MR2Reducer extends Reducer <Text, Text, Text, Text> {
public void reduce (Text key, Iterable <Text> values, Context contexte)
throws IOException, InterruptedException {
int N = 0 ;
// les iterateurs sont utilisable qu'une seule fois. Donc il faut
// garder les valeurs dans une arraylist pour les reparcourir.
ArrayList <String> altmp = new ArrayList <String> () ;
// 1ere boucle : calcul de la somme totale des mots
for (Text val : values) {
String sval = val.toString () ;
String sn = sval.substring (sval.indexOf ("|")+1) ;
int n = Integer.parseInt (sn) ;
altmp.add (val.toString ()) ;
N += n ;
}
// 2eme boucle : calcul de la somme totale des mots
Iterator <String> it = altmp.iterator () ;
while (it.hasNext ()) {
String val = it.next () ;
String sval = val.toString () ;
String word = sval.substring (0, sval.indexOf ("|")) ;
String sn = sval.substring (sval.indexOf ("|")+1) ;
int n = Integer.parseInt (sn) ;
// I tried to replace n with "gg" here, still same teleporting issue
contexte.write (new Text (word + "|" + clef.toString ()), new Text (n + "|" + N)) ;
}
}
}
public static void main (String args []) throws Exception {
if (args.length != 2) {
System.err.println (args.length + "("+args [0] + "," +args [1] + ")") ;
System.err.println ("Usage : MR2 <source> <destination>") ;
System.exit (-1) ;
}
Job job = new Job () ;
job.setJarByClass (MR2.class) ;
// Le fichier HDFS a utiliser en entree
FileInputFormat.addInputPath (job, new Path (args [0])) ;
FileOutputFormat.setOutputPath (job, new Path (args [1])) ;
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapperClass (MR2Mapper.class) ;
job.setCombinerClass (MR2Reducer.class) ;
job.setReducerClass (MR2Reducer.class) ;
job.setMapOutputKeyClass (Text.class) ;
job.setMapOutputValueClass (Text.class) ;
job.setOutputKeyClass (Text.class) ;
job.setOutputValueClass (Text.class) ;
System.exit (job.waitForCompletion (true) ? 0 : 1) ;
}
}
如有任何帮助,我们将不胜感激。
这是 Combiner 的错。您在驱动程序 class 中指定要在以下命令中将 MR2Reducer
用作组合器和减速器:
job.setCombinerClass (MR2Reducer.class) ;
job.setReducerClass (MR2Reducer.class) ;
但是,一个Combiner是运行在一个Map
个instance的范围内,而一个Reducer是在所有Mappers执行完之后串联运行的。通过使用 Combiner,您实际上是在执行每个单独的 Mapper 任务后立即执行 MR2Reducer
,因此它计算 N
并在每个 Mapper 任务中拆分给定键值输入的复合值范围。
这基本上导致 Reduce
阶段通过输入 (word|file, n|N)
键值对模式(也就是 MR2Reducer
任务的输出 在 Reduce
阶段之前)而不是所需的 (file, word|n)
模式。通过在不知不觉中使用错误模式,您错误地拆分了复合值,并且输出的键值对看起来不稳定,错误,and/or 反转。
要解决此问题,您可以:
- 创建一个自定义 Combiner,它将具有与当前
MR2Reducer
相同的命令,然后更改 MR2Reducer
class 以接收 [=17= 中的键值对] 模式(不推荐,因为它可能会抵消可伸缩性和执行时间方面的所有好处,并且只会使您的 MapReduce 作业变得更加复杂),或
- 从您的驱动程序 class 中删除或注释掉
job.setCombinerClass (MR2Reducer.class) ;
行以保持简单和实用,以便您将来可以从那里构建。
为了展示这一点,我在我的机器上本地使用了你的 MR1
、MR2
classes,删除了 job.setCombinerClass (MR2Reducer.class) ;
行并使用了 this 输入存储在 HDFS 中以验证输出的键值对是否符合要求。这是执行后的输出片段:
balance|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 1|661
suppress|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 1|661
back|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 4|661
after|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 1|661
suspicious|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 2|661
swang|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 2|661
swinging|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 1|661
我一直在尝试在 Hadoop 中使用 MapReduce 实现 TfIdf 算法。我的 TFIDF 分 4 个步骤进行(我称它们为 MR1、MR2、MR3、MR4)。这是我的 input/outputs:
MR1: (offset, line) ==(Map)==> (word|file, 1) ==(Reduce)==> (word|file, n)
MR2: (word|file, n) ==(Map)==> (file, word|n) ==(Reduce)==> (word|file, n|N)
MR3: (word|file, n|N) ==(Map)==> (word, file|n|N|1) ==(Reduce)==> (word|file, n|N|M)
MR4: (word|file, n|N|M) ==(Map)==> (word|file, n/N log D/M)
其中 n =(单词,文件)不同对的数量,N = 每个文件中的单词数,M = 每个单词出现的文档数,D = 文档数。
从 MR1 阶段开始,我得到了正确的输出,例如:hello|hdfs://..... 2
对于 MR2 阶段,我预计:hello|hdfs://....... 2|192
但我得到 2|hello|hdfs://...... 192|192
我很确定我的代码是正确的,每次我尝试在减少阶段向我的“值”添加一个字符串以查看发生了什么时,相同的字符串在关键部分被“传送”。
示例:gg|word|hdfs://.... gg|192
这是我的 MR1 代码:
public class MR1 {
/* Classe Map :
* Entree : (offset, line)
* Sortie : (word|file, 1)
* Sends 1 for each word per line.
*/
static class MR1Mapper extends Mapper <LongWritable, Text, Text, IntWritable > {
public void map (LongWritable key, Text value, Context contexte)
throws IOException, InterruptedException {
// Recuperation du nom du fichier associe au "split"
FileSplit split = (FileSplit) contexte.getInputSplit();
String fileName = split.getPath().toString();
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line, "' \t:,;:!?./-_()[]{}\"&%<>");
while (tokenizer.hasMoreTokens()) {
String word = tokenizer.nextToken().toLowerCase();
contexte.write(new Text(word + "|" + fileName), new IntWritable(1));
}
}
}
/* Class Reducer : compte le nombre d'occurrence total par mot/fichier
* Entree : (word|file, x)
* Sortie : (word|file, n)
*/
public static class MR1Reducer extends Reducer <Text, IntWritable, Text, IntWritable > {
public void reduce(Text key, Iterable < IntWritable > values, Context contexte)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val:values) {
sum += val.get();
}
contexte.write(key, new IntWritable(sum));
}
}
public static void main(String args[]) throws Exception {
if (args.length != 2) {
System.err.println(args.length + "(" + args[0] + "," + args[1] + ")");
System.err. println("Usage : MR1 <source> <destination>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MR1.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MR1Mapper.class);
job.setCombinerClass (MR1Reducer.class) ;
job.setReducerClass(MR1Reducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
这是我的 MR2 代码:
public class MR2 {
/* Map : on isole le nom du fichier.
* Entree : (word|file, n)
* Sortie : (file, word|n)
*/
static class MR2Mapper extends Mapper <Text, Text, Text, Text> {
public void map (Text key, Text value, Context contexte)
throws IOException, InterruptedException {
String skey = key.toString () ;
String word = skey.substring (0, skey.indexOf ("|")) ;
String fileName = skey.substring (skey.indexOf ("|")+1) ;
contexte.write (new Text (fileName), new Text (word + "|" + value)) ;
}
}
/* Reduce : on somme le nombre d'occurence de chaque mot du fichier
* Entree : (file, word|n)
* Sortie : (word|file, n|N)
*/
public static class MR2Reducer extends Reducer <Text, Text, Text, Text> {
public void reduce (Text key, Iterable <Text> values, Context contexte)
throws IOException, InterruptedException {
int N = 0 ;
// les iterateurs sont utilisable qu'une seule fois. Donc il faut
// garder les valeurs dans une arraylist pour les reparcourir.
ArrayList <String> altmp = new ArrayList <String> () ;
// 1ere boucle : calcul de la somme totale des mots
for (Text val : values) {
String sval = val.toString () ;
String sn = sval.substring (sval.indexOf ("|")+1) ;
int n = Integer.parseInt (sn) ;
altmp.add (val.toString ()) ;
N += n ;
}
// 2eme boucle : calcul de la somme totale des mots
Iterator <String> it = altmp.iterator () ;
while (it.hasNext ()) {
String val = it.next () ;
String sval = val.toString () ;
String word = sval.substring (0, sval.indexOf ("|")) ;
String sn = sval.substring (sval.indexOf ("|")+1) ;
int n = Integer.parseInt (sn) ;
// I tried to replace n with "gg" here, still same teleporting issue
contexte.write (new Text (word + "|" + clef.toString ()), new Text (n + "|" + N)) ;
}
}
}
public static void main (String args []) throws Exception {
if (args.length != 2) {
System.err.println (args.length + "("+args [0] + "," +args [1] + ")") ;
System.err.println ("Usage : MR2 <source> <destination>") ;
System.exit (-1) ;
}
Job job = new Job () ;
job.setJarByClass (MR2.class) ;
// Le fichier HDFS a utiliser en entree
FileInputFormat.addInputPath (job, new Path (args [0])) ;
FileOutputFormat.setOutputPath (job, new Path (args [1])) ;
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapperClass (MR2Mapper.class) ;
job.setCombinerClass (MR2Reducer.class) ;
job.setReducerClass (MR2Reducer.class) ;
job.setMapOutputKeyClass (Text.class) ;
job.setMapOutputValueClass (Text.class) ;
job.setOutputKeyClass (Text.class) ;
job.setOutputValueClass (Text.class) ;
System.exit (job.waitForCompletion (true) ? 0 : 1) ;
}
}
如有任何帮助,我们将不胜感激。
这是 Combiner 的错。您在驱动程序 class 中指定要在以下命令中将 MR2Reducer
用作组合器和减速器:
job.setCombinerClass (MR2Reducer.class) ;
job.setReducerClass (MR2Reducer.class) ;
但是,一个Combiner是运行在一个Map
个instance的范围内,而一个Reducer是在所有Mappers执行完之后串联运行的。通过使用 Combiner,您实际上是在执行每个单独的 Mapper 任务后立即执行 MR2Reducer
,因此它计算 N
并在每个 Mapper 任务中拆分给定键值输入的复合值范围。
这基本上导致 Reduce
阶段通过输入 (word|file, n|N)
键值对模式(也就是 MR2Reducer
任务的输出 在 Reduce
阶段之前)而不是所需的 (file, word|n)
模式。通过在不知不觉中使用错误模式,您错误地拆分了复合值,并且输出的键值对看起来不稳定,错误,and/or 反转。
要解决此问题,您可以:
- 创建一个自定义 Combiner,它将具有与当前
MR2Reducer
相同的命令,然后更改MR2Reducer
class 以接收 [=17= 中的键值对] 模式(不推荐,因为它可能会抵消可伸缩性和执行时间方面的所有好处,并且只会使您的 MapReduce 作业变得更加复杂),或 - 从您的驱动程序 class 中删除或注释掉
job.setCombinerClass (MR2Reducer.class) ;
行以保持简单和实用,以便您将来可以从那里构建。
为了展示这一点,我在我的机器上本地使用了你的 MR1
、MR2
classes,删除了 job.setCombinerClass (MR2Reducer.class) ;
行并使用了 this 输入存储在 HDFS 中以验证输出的键值对是否符合要求。这是执行后的输出片段:
balance|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 1|661
suppress|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 1|661
back|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 4|661
after|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 1|661
suspicious|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 2|661
swang|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 2|661
swinging|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 1|661