Spark flatMap/reduce:如何扩展和避免 OutOfMemory?
Spark flatMap/reduce: How to scale and avoid OutOfMemory?
我正在将一些 map-reduce 代码迁移到 Spark 中,在函数中构造 Iterable to return 时遇到问题。
在 MR 代码中,我有一个按键分组的 reduce 函数,然后(使用 multipleOutputs)将迭代值并使用 write(在多个输出中,但这并不重要)像这样的代码(简化):
reduce(Key key, Iterable<Text> values) {
// ... some code
for (Text xml: values) {
multipleOutputs.write(key, val, directory);
}
}
但是,在 Spark 中,我翻译了一个地图,并将其归约为以下序列:
mapToPair -> groupByKey -> flatMap
按照建议...在某本书中。
mapToPair 基本上通过 functionMap 添加一个 Key,它根据记录上的一些值为该记录创建一个 Key。有时一个键可能有很高的基数。
JavaPairRDD<Key, String> rddPaired = inputRDD.mapToPair(new PairFunction<String, Key, String>() {
public Tuple2<Key, String> call(String value) {
//...
return functionMap.call(value);
}
});
rddPaired 应用了 RDD.groupByKey() 来获取 RDD 来提供 flatMap 函数:
JavaPairRDD<Key, Iterable<String>> rddGrouped = rddPaired.groupByKey();
分组后,调用 flatMap 来执行 reduce。这里,operation 是一个转换:
public Iterable<String> call (Tuple2<Key, Iterable<String>> keyValue) {
// some code...
List<String> out = new ArrayList<String>();
if (someConditionOnKey) {
// do a logic
Grouper grouper = new Grouper();
for (String xml : keyValue._2()) {
// group in a separate class
grouper.add(xml);
}
// operation is now performed on the whole group
out.add(operation(grouper));
} else {
for (String xml : keyValue._2()) {
out.add(operation(xml));
}
return out;
}
}
它工作正常...对于没有太多记录的键。实际上,当具有很多值的键进入 reduce."else" 时,它会因 OutOfMemory 而中断。
注意:我已经包含了"if"部分来解释我想要产生的逻辑,但是当输入"else"时失败发生...因为当数据输入"else",这通常意味着由于数据的性质,会有更多的值。
很明显,必须将所有分组值保存在 "out" 列表中,如果一个键有数百万条记录,它就不会缩放,因为它会将它们保存在内存中。我已经到了 OOM 发生的地步(是的,它是在执行上面的 "operation" 时要求内存 - 并且给出了 none。虽然这不是一个非常昂贵的内存操作)。
有什么方法可以避免这种情况以扩大规模吗?通过使用一些其他指令复制行为以更可扩展的方式达到相同的输出,或者能够将合并的值交给 Spark(就像我以前对 MR 所做的那样)...
在 flatMap
操作中做条件是低效的。您应该检查外部条件以创建 2 个不同的 RDD 并分别处理它们。
rddPaired.cache();
// groupFilterFunc will filter which items need grouping
JavaPairRDD<Key, Iterable<String>> rddGrouped = rddPaired.filter(groupFilterFunc).groupByKey();
// processGroupedValuesFunction should call `operation` on group of all values with the same key and return the result
rddGrouped.mapValues(processGroupedValuesFunction);
// nogroupFilterFunc will filter which items don't need grouping
JavaPairRDD<Key, Iterable<String>> rddNoGrouped = rddPaired.filter(nogroupFilterFunc);
// processNoGroupedValuesFunction2 should call `operation` on a single value and return the result
rddNoGrouped.mapValues(processNoGroupedValuesFunction2);
我正在将一些 map-reduce 代码迁移到 Spark 中,在函数中构造 Iterable to return 时遇到问题。 在 MR 代码中,我有一个按键分组的 reduce 函数,然后(使用 multipleOutputs)将迭代值并使用 write(在多个输出中,但这并不重要)像这样的代码(简化):
reduce(Key key, Iterable<Text> values) {
// ... some code
for (Text xml: values) {
multipleOutputs.write(key, val, directory);
}
}
但是,在 Spark 中,我翻译了一个地图,并将其归约为以下序列: mapToPair -> groupByKey -> flatMap 按照建议...在某本书中。
mapToPair 基本上通过 functionMap 添加一个 Key,它根据记录上的一些值为该记录创建一个 Key。有时一个键可能有很高的基数。
JavaPairRDD<Key, String> rddPaired = inputRDD.mapToPair(new PairFunction<String, Key, String>() {
public Tuple2<Key, String> call(String value) {
//...
return functionMap.call(value);
}
});
rddPaired 应用了 RDD.groupByKey() 来获取 RDD 来提供 flatMap 函数:
JavaPairRDD<Key, Iterable<String>> rddGrouped = rddPaired.groupByKey();
分组后,调用 flatMap 来执行 reduce。这里,operation 是一个转换:
public Iterable<String> call (Tuple2<Key, Iterable<String>> keyValue) {
// some code...
List<String> out = new ArrayList<String>();
if (someConditionOnKey) {
// do a logic
Grouper grouper = new Grouper();
for (String xml : keyValue._2()) {
// group in a separate class
grouper.add(xml);
}
// operation is now performed on the whole group
out.add(operation(grouper));
} else {
for (String xml : keyValue._2()) {
out.add(operation(xml));
}
return out;
}
}
它工作正常...对于没有太多记录的键。实际上,当具有很多值的键进入 reduce."else" 时,它会因 OutOfMemory 而中断。
注意:我已经包含了"if"部分来解释我想要产生的逻辑,但是当输入"else"时失败发生...因为当数据输入"else",这通常意味着由于数据的性质,会有更多的值。
很明显,必须将所有分组值保存在 "out" 列表中,如果一个键有数百万条记录,它就不会缩放,因为它会将它们保存在内存中。我已经到了 OOM 发生的地步(是的,它是在执行上面的 "operation" 时要求内存 - 并且给出了 none。虽然这不是一个非常昂贵的内存操作)。
有什么方法可以避免这种情况以扩大规模吗?通过使用一些其他指令复制行为以更可扩展的方式达到相同的输出,或者能够将合并的值交给 Spark(就像我以前对 MR 所做的那样)...
在 flatMap
操作中做条件是低效的。您应该检查外部条件以创建 2 个不同的 RDD 并分别处理它们。
rddPaired.cache();
// groupFilterFunc will filter which items need grouping
JavaPairRDD<Key, Iterable<String>> rddGrouped = rddPaired.filter(groupFilterFunc).groupByKey();
// processGroupedValuesFunction should call `operation` on group of all values with the same key and return the result
rddGrouped.mapValues(processGroupedValuesFunction);
// nogroupFilterFunc will filter which items don't need grouping
JavaPairRDD<Key, Iterable<String>> rddNoGrouped = rddPaired.filter(nogroupFilterFunc);
// processNoGroupedValuesFunction2 should call `operation` on a single value and return the result
rddNoGrouped.mapValues(processNoGroupedValuesFunction2);