Hadoop 减速器上的奇怪行为
Strange behavior on Hadoop's reducer
我有一个名为 Pair
的简单 class,它实现了 org.apache.hadoop.io.Writable
。它包含两个字段并在 MapReduce 过程中用作值。
对于每个键,我想找到 Pair 的一个字段 (preco) 中具有最大值的对。在减速器中,以下代码产生预期结果:
float max = 0;
String country = "";
for (Pair p : values){
if (p.getPreco().get() > max)
{
max = p.getPreco().get();
country = p.getPais().toString();
}
}
context.write(key, new Pair(new FloatWritable(max), new Text(country)));
另一方面,以下代码不会:
Pair max = new Pair();
for (Pair p : values)
if (p.getPreco().get() > max.getPreco().get())
max = p;
context.write(key, max);
第二个代码为每个键生成输入文件中与其关联的最后一个值,而不是最大值。
这种看似奇怪的行为是否有原因?
你有这个问题是因为 reducer 正在重用对象,所以它对值的迭代器总是传递给你同一个对象。因此这段代码:
max = p;
将始终引用 p
的当前值。您需要将数据复制到 max
才能正常工作,而不是引用该对象。这就是您的代码的第一个版本可以正常工作的原因。
通常在 Hadoop 中,我会在自定义可写对象上实现一个 .set()
方法,这是您将看到的常见模式。所以你的 Pair
class 可能看起来有点像(它缺少接口方法等):
public class Pair implements Writable {
public FloatWritable max = new FloatWritable();
public Text country = new Text();
public void set(Pair p) {
this.max.set(p.max.get());
this.country.set(p.country);
}
}
您会将代码更改为:
Pair max = new Pair();
for (Pair p : values) {
if (p.max().get() > max.max.get()) {
max.set(p);
}
}
context.write(key, max);
我还没有在 Pair
中创建 getters
,所以代码稍微更改为直接访问 public class 变量。
我有一个名为 Pair
的简单 class,它实现了 org.apache.hadoop.io.Writable
。它包含两个字段并在 MapReduce 过程中用作值。
对于每个键,我想找到 Pair 的一个字段 (preco) 中具有最大值的对。在减速器中,以下代码产生预期结果:
float max = 0;
String country = "";
for (Pair p : values){
if (p.getPreco().get() > max)
{
max = p.getPreco().get();
country = p.getPais().toString();
}
}
context.write(key, new Pair(new FloatWritable(max), new Text(country)));
另一方面,以下代码不会:
Pair max = new Pair();
for (Pair p : values)
if (p.getPreco().get() > max.getPreco().get())
max = p;
context.write(key, max);
第二个代码为每个键生成输入文件中与其关联的最后一个值,而不是最大值。
这种看似奇怪的行为是否有原因?
你有这个问题是因为 reducer 正在重用对象,所以它对值的迭代器总是传递给你同一个对象。因此这段代码:
max = p;
将始终引用 p
的当前值。您需要将数据复制到 max
才能正常工作,而不是引用该对象。这就是您的代码的第一个版本可以正常工作的原因。
通常在 Hadoop 中,我会在自定义可写对象上实现一个 .set()
方法,这是您将看到的常见模式。所以你的 Pair
class 可能看起来有点像(它缺少接口方法等):
public class Pair implements Writable {
public FloatWritable max = new FloatWritable();
public Text country = new Text();
public void set(Pair p) {
this.max.set(p.max.get());
this.country.set(p.country);
}
}
您会将代码更改为:
Pair max = new Pair();
for (Pair p : values) {
if (p.max().get() > max.max.get()) {
max.set(p);
}
}
context.write(key, max);
我还没有在 Pair
中创建 getters
,所以代码稍微更改为直接访问 public class 变量。