在 MapReduce 中使用列表作为值 Returns 相同的值
Using List as Value in MapReduce Returns Identical Values
我有一个 MapReduce 作业,它输出 IntWritable 作为键和 Point(我创建的实现可写的对象)对象作为 map 函数的值。然后在 reduce 函数中,我使用 for-each 循环遍历 Points 的可迭代对象来创建一个列表:
@Override
public void reduce(IntWritable key, Iterable<Point> points, Context context) throws IOException, InterruptedException {
List<Point> pointList = new ArrayList<>();
for (Point point : points) {
pointList.add(point);
}
context.write(key, pointList);
}
问题是这个列表的大小是正确的,但是每个点都是完全一样的。我的 Point class 中的字段不是静态的,我在循环中单独打印了每个点以确保这些点是唯一的(它们是)。此外,我创建了一个单独的 class,它只创建了几个点并将它们添加到列表中,这似乎有效,这意味着 MapReduce 做了一些我不知道的事情。
如能帮助解决此问题,我们将不胜感激。
更新:
映射器代码 class:
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private IntWritable firstChar = new IntWritable();
private Point point = new Point();
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line, " ");
while(tokenizer.hasMoreTokens()) {
String atts = tokenizer.nextToken();
String cut = atts.substring(1, atts.length() - 1);
String[] nums = cut.split(",");
point.set(Double.parseDouble(nums[0]), Double.parseDouble(nums[1]), Double.parseDouble(nums[2]), Double.parseDouble(nums[3]));
context.write(one, point);
}
}
点class:
public class Point implements Writable {
public Double att1;
public Double att2;
public Double att3;
public Double att4;
public Point() {
}
public void set(Double att1, Double att2, Double att3, Double att4) {
this.att1 = att1;
this.att2 = att2;
this.att3 = att3;
this.att4 = att4;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeDouble(att1);
dataOutput.writeDouble(att2);
dataOutput.writeDouble(att3);
dataOutput.writeDouble(att4);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.att1 = dataInput.readDouble();
this.att2 = dataInput.readDouble();
this.att3 = dataInput.readDouble();
this.att4 = dataInput.readDouble();
}
@Override
public String toString() {
String output = "{" + att1 + ", " + att2 + ", " + att3 + ", " + att4 + "}";
return output;
}
问题出在你的减速器上。您不想将所有点存储在内存中。它们可能很大,Hadoop 会为您解决这个问题(即使以一种笨拙的方式)。
当遍历给定的 Iterable<Points>
时,每个 Point
实例都会被重复使用,因此它在给定时间只保留一个实例。
这意味着当您调用 points.next()
时,将发生以下两件事:
Point
实例被重新使用并设置下一个点数据
- 同样适用于
Key
实例。
在您的情况下,您会在列表中找到多次插入的 Point
的一个实例,并使用最后一个 Point
.
的数据进行设置
你不应该在你的减速器中保存 Writables
的实例或者应该克隆它们。
您可以在此处阅读有关此问题的更多信息
https://cornercases.wordpress.com/2011/08/18/hadoop-object-reuse-pitfall-all-my-reducer-values-are-the-same/
我有一个 MapReduce 作业,它输出 IntWritable 作为键和 Point(我创建的实现可写的对象)对象作为 map 函数的值。然后在 reduce 函数中,我使用 for-each 循环遍历 Points 的可迭代对象来创建一个列表:
@Override
public void reduce(IntWritable key, Iterable<Point> points, Context context) throws IOException, InterruptedException {
List<Point> pointList = new ArrayList<>();
for (Point point : points) {
pointList.add(point);
}
context.write(key, pointList);
}
问题是这个列表的大小是正确的,但是每个点都是完全一样的。我的 Point class 中的字段不是静态的,我在循环中单独打印了每个点以确保这些点是唯一的(它们是)。此外,我创建了一个单独的 class,它只创建了几个点并将它们添加到列表中,这似乎有效,这意味着 MapReduce 做了一些我不知道的事情。
如能帮助解决此问题,我们将不胜感激。
更新: 映射器代码 class:
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private IntWritable firstChar = new IntWritable();
private Point point = new Point();
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line, " ");
while(tokenizer.hasMoreTokens()) {
String atts = tokenizer.nextToken();
String cut = atts.substring(1, atts.length() - 1);
String[] nums = cut.split(",");
point.set(Double.parseDouble(nums[0]), Double.parseDouble(nums[1]), Double.parseDouble(nums[2]), Double.parseDouble(nums[3]));
context.write(one, point);
}
}
点class:
public class Point implements Writable {
public Double att1;
public Double att2;
public Double att3;
public Double att4;
public Point() {
}
public void set(Double att1, Double att2, Double att3, Double att4) {
this.att1 = att1;
this.att2 = att2;
this.att3 = att3;
this.att4 = att4;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeDouble(att1);
dataOutput.writeDouble(att2);
dataOutput.writeDouble(att3);
dataOutput.writeDouble(att4);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.att1 = dataInput.readDouble();
this.att2 = dataInput.readDouble();
this.att3 = dataInput.readDouble();
this.att4 = dataInput.readDouble();
}
@Override
public String toString() {
String output = "{" + att1 + ", " + att2 + ", " + att3 + ", " + att4 + "}";
return output;
}
问题出在你的减速器上。您不想将所有点存储在内存中。它们可能很大,Hadoop 会为您解决这个问题(即使以一种笨拙的方式)。
当遍历给定的 Iterable<Points>
时,每个 Point
实例都会被重复使用,因此它在给定时间只保留一个实例。
这意味着当您调用 points.next()
时,将发生以下两件事:
Point
实例被重新使用并设置下一个点数据- 同样适用于
Key
实例。
在您的情况下,您会在列表中找到多次插入的 Point
的一个实例,并使用最后一个 Point
.
你不应该在你的减速器中保存 Writables
的实例或者应该克隆它们。
您可以在此处阅读有关此问题的更多信息
https://cornercases.wordpress.com/2011/08/18/hadoop-object-reuse-pitfall-all-my-reducer-values-are-the-same/