reduce 阶段的输入不是我在 Hadoop 中所期望的 (Java)
Input of the reduce phase is not what I expect in Hadoop (Java)
我正在使用 MapReduce 在 Hadoop 中开发一个非常简单的图形分析工具。我有一个如下图所示的图(每行代表一条边——实际上,这是一个三角形图):
1 3
3 1
3 2
2 3
现在,我想使用 MapReduce 来计算此图中的三角形(显然是一个)。它仍在进行中,在第一阶段,我尝试获取每个顶点的所有邻居列表。
我的主要 class 如下所示:
public class TriangleCount {
public static void main( String[] args ) throws Exception {
// remove the old output directory
FileSystem fs = FileSystem.get(new Configuration());
fs.delete(new Path("output/"), true);
JobConf firstPhaseJob = new JobConf(FirstPhase.class);
firstPhaseJob.setOutputKeyClass(IntWritable.class);
firstPhaseJob.setOutputValueClass(IntWritable.class);
firstPhaseJob.setMapperClass(FirstPhase.Map.class);
firstPhaseJob.setCombinerClass(FirstPhase.Reduce.class);
firstPhaseJob.setReducerClass(FirstPhase.Reduce.class);
FileInputFormat.setInputPaths(firstPhaseJob, new Path("input/"));
FileOutputFormat.setOutputPath(firstPhaseJob, new Path("output/"));
JobClient.runJob(firstPhaseJob);
}
}
我的 Mapper 和 Reducer 实现是这样的,它们都很简单:
public class FirstPhase {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, IntWritable> {
@Override
public void map(LongWritable longWritable, Text graphLine, OutputCollector<IntWritable, IntWritable> outputCollector, Reporter reporter) throws IOException {
StringTokenizer tokenizer = new StringTokenizer(graphLine.toString());
int n1 = Integer.parseInt(tokenizer.nextToken());
int n2 = Integer.parseInt(tokenizer.nextToken());
if(n1 > n2) {
System.out.println("emitting (" + new IntWritable(n1) + ", " + new IntWritable(n2) + ")");
outputCollector.collect(new IntWritable(n1), new IntWritable(n2));
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<IntWritable, IntWritable, IntWritable, Text> {
@Override
public void reduce(IntWritable key, Iterator<IntWritable> iterator, OutputCollector<IntWritable, Text> outputCollector, Reporter reporter) throws IOException {
List<IntWritable> nNodes = new ArrayList<>();
while(iterator.hasNext()) {
nNodes.add(iterator.next());
}
System.out.println("key: " + key + ", list: " + nNodes);
// create pairs and emit these
for(IntWritable n1 : nNodes) {
for(IntWritable n2 : nNodes) {
outputCollector.collect(key, new Text(n1.toString() + " " + n2.toString()));
}
}
}
}
}
我在程序中添加了一些日志记录。在地图阶段,我打印我正在发射的对。在reduce阶段,我打印reduce的输入。我得到以下输出:
emitting (3, 1)
emitting (3, 2)
key: 3, list: [1, 1]
reduce 函数的输入不是我所期望的。我希望它是 [1, 2] 而不是 [1, 1]。我相信 Hadoop 会自动组合映射阶段输出的所有发射对,但我在这里遗漏了什么吗?任何帮助或解释将不胜感激。
对于刚开始使用 Hadoop MapReduce 的人来说,这是一个典型的问题。
问题出在你的减速器上。当遍历给定的 Iterator<IntWritable>
时,每个 IntWritable
实例都会被重新使用,因此在给定时间它只保留一个实例。
这意味着当您调用 iterator.next()
时,您第一个保存的 IntWritable
实例将设置为新值。
您可以在此处阅读有关此问题的更多信息
https://cornercases.wordpress.com/2011/08/18/hadoop-object-reuse-pitfall-all-my-reducer-values-are-the-same/
我正在使用 MapReduce 在 Hadoop 中开发一个非常简单的图形分析工具。我有一个如下图所示的图(每行代表一条边——实际上,这是一个三角形图):
1 3
3 1
3 2
2 3
现在,我想使用 MapReduce 来计算此图中的三角形(显然是一个)。它仍在进行中,在第一阶段,我尝试获取每个顶点的所有邻居列表。
我的主要 class 如下所示:
public class TriangleCount {
public static void main( String[] args ) throws Exception {
// remove the old output directory
FileSystem fs = FileSystem.get(new Configuration());
fs.delete(new Path("output/"), true);
JobConf firstPhaseJob = new JobConf(FirstPhase.class);
firstPhaseJob.setOutputKeyClass(IntWritable.class);
firstPhaseJob.setOutputValueClass(IntWritable.class);
firstPhaseJob.setMapperClass(FirstPhase.Map.class);
firstPhaseJob.setCombinerClass(FirstPhase.Reduce.class);
firstPhaseJob.setReducerClass(FirstPhase.Reduce.class);
FileInputFormat.setInputPaths(firstPhaseJob, new Path("input/"));
FileOutputFormat.setOutputPath(firstPhaseJob, new Path("output/"));
JobClient.runJob(firstPhaseJob);
}
}
我的 Mapper 和 Reducer 实现是这样的,它们都很简单:
public class FirstPhase {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, IntWritable> {
@Override
public void map(LongWritable longWritable, Text graphLine, OutputCollector<IntWritable, IntWritable> outputCollector, Reporter reporter) throws IOException {
StringTokenizer tokenizer = new StringTokenizer(graphLine.toString());
int n1 = Integer.parseInt(tokenizer.nextToken());
int n2 = Integer.parseInt(tokenizer.nextToken());
if(n1 > n2) {
System.out.println("emitting (" + new IntWritable(n1) + ", " + new IntWritable(n2) + ")");
outputCollector.collect(new IntWritable(n1), new IntWritable(n2));
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<IntWritable, IntWritable, IntWritable, Text> {
@Override
public void reduce(IntWritable key, Iterator<IntWritable> iterator, OutputCollector<IntWritable, Text> outputCollector, Reporter reporter) throws IOException {
List<IntWritable> nNodes = new ArrayList<>();
while(iterator.hasNext()) {
nNodes.add(iterator.next());
}
System.out.println("key: " + key + ", list: " + nNodes);
// create pairs and emit these
for(IntWritable n1 : nNodes) {
for(IntWritable n2 : nNodes) {
outputCollector.collect(key, new Text(n1.toString() + " " + n2.toString()));
}
}
}
}
}
我在程序中添加了一些日志记录。在地图阶段,我打印我正在发射的对。在reduce阶段,我打印reduce的输入。我得到以下输出:
emitting (3, 1)
emitting (3, 2)
key: 3, list: [1, 1]
reduce 函数的输入不是我所期望的。我希望它是 [1, 2] 而不是 [1, 1]。我相信 Hadoop 会自动组合映射阶段输出的所有发射对,但我在这里遗漏了什么吗?任何帮助或解释将不胜感激。
对于刚开始使用 Hadoop MapReduce 的人来说,这是一个典型的问题。
问题出在你的减速器上。当遍历给定的 Iterator<IntWritable>
时,每个 IntWritable
实例都会被重新使用,因此在给定时间它只保留一个实例。
这意味着当您调用 iterator.next()
时,您第一个保存的 IntWritable
实例将设置为新值。
您可以在此处阅读有关此问题的更多信息
https://cornercases.wordpress.com/2011/08/18/hadoop-object-reuse-pitfall-all-my-reducer-values-are-the-same/