Hadoop 减速器上的奇怪行为

Strange behavior on Hadoop's reducer

我有一个名为 Pair 的简单 class,它实现了 org.apache.hadoop.io.Writable。它包含两个字段并在 MapReduce 过程中用作值。

对于每个键,我想找到 Pair 的一个字段 (preco) 中具有最大值的对。在减速器中,以下代码产生预期结果:

float max = 0;
String country = "";
for (Pair p : values){
    if (p.getPreco().get() > max)
    {
        max = p.getPreco().get();
        country = p.getPais().toString();
    }
}
context.write(key, new Pair(new FloatWritable(max), new Text(country)));

另一方面,以下代码不会:

Pair max = new Pair();
for (Pair p : values)
    if (p.getPreco().get() > max.getPreco().get())
        max = p;

context.write(key, max);

第二个代码为每个键生成输入文件中与其关联的最后一个值,而不是最大值。

这种看似奇怪的行为是否有原因?

你有这个问题是因为 reducer 正在重用对象,所以它对值的迭代器总是传递给你同一个对象。因此这段代码:

max = p;

将始终引用 p 的当前值。您需要将数据复制到 max 才能正常工作,而不是引用该对象。这就是您的代码的第一个版本可以正常工作的原因。

通常在 Hadoop 中,我会在自定义可写对象上实现一个 .set() 方法,这是您将看到的常见模式。所以你的 Pair class 可能看起来有点像(它缺少接口方法等):

public class Pair implements Writable {

    public FloatWritable max = new FloatWritable();
    public Text country = new Text();

    public void set(Pair p) {
        this.max.set(p.max.get());
        this.country.set(p.country);
    }
}

您会将代码更改为:

Pair max = new Pair();
for (Pair p : values) {
    if (p.max().get() > max.max.get()) {
        max.set(p);
    }
}
context.write(key, max);

我还没有在 Pair 中创建 getters,所以代码稍微更改为直接访问 public class 变量。