处理一对具有严重偏斜数据的 RDD 时性能不佳
Poor performance processing a pair RDD with very skewed data
我有一个包含数百万个键值对的 RDD 对,其中每个值都是一个列表,可能包含单个元素或数十亿个元素。这会导致性能不佳,因为大型组会阻塞集群的节点数小时,而需要几秒钟的组无法并行处理,因为整个集群已经很忙。
有什么可以改进的吗?
编辑:
给我带来问题的操作是 flatMap
,其中分析了给定键的整个列表。键未被触及,该操作将列表中的每个元素与列表的其余部分进行比较,这需要花费大量时间,但不幸的是它必须完成。这意味着整个列表需要同时在同一个节点中。生成的 RDD 将包含一个子列表,具体取决于 flatMap
.
中计算的值
在这种情况下我不能使用广播变量,因为在不同的键值对之间不会使用公共数据。至于分区器,根据 O'Reilly Learning Spark 一书,这种操作不会从分区器中受益,因为不涉及混洗(尽管我不确定这是否属实)。分区程序可以在这种情况下提供帮助吗?
第二次编辑:
这是我的代码示例:
public class MyFunction implements FlatMapFunction
<Tuple2<String, Iterable<Bean>>, ComparedPerson> {
public Iterable<ProcessedBean> call(Tuple2<Key, Iterable<Bean>> input) throws Exception {
List<ProcessedBean> output = new ArrayList<ProcessedBean>();
List<Bean> listToProcess = CollectionsUtil.makeList(input._2());
// In some cases size == 2, in others size > 100.000
for (int i = 0; i < listToProcess.size() - 1; i++) {
for (int j = i + 1; j < listToProcess.size(); j++) {
ProcessedBean processed = processData(listToProcess.get(i), listToProcess.get(j));
if (processed != null) {
output.add(processed);
}
}
}
return output;
}
双 for 将循环 n(n-1)/2
次,但这是无法避免的。
像这样的偏差通常是特定领域的。您可以将您的价值数据创建为 RDD 并加入其中。或者您可以尝试使用广播变量。或者您可以编写一个自定义分区程序来帮助以不同方式拆分数据。
但是,最终,这将取决于数据的计算和细节。
键的处理顺序对总计算时间没有影响。我可以想象的唯一的方差问题(有些值很小,有些值很大)是在处理结束时:一个大任务仍在 运行 而所有其他节点已经完成。
如果这是您所看到的,您可以尝试增加分区数。这将减少任务的大小,因此最后出现超大任务的可能性较小。
广播变量和分区器对性能没有帮助。我认为您应该专注于尽可能高效地进行所有内容的比较步骤。 (或者更好的是,避免它。我不认为二次算法在大数据中真的可持续。)
如果 'processData' 很昂贵,您可以并行化该步骤并在那里获得一些收益。
在伪代码中,它类似于:
def processData(bean1:Bean, bean2:Bean):Option[ProcessedData] = { ... }
val rdd:RDD[(Key, List[Bean])] = ...
val pairs:RDD[(Bean, Bean)] = rdd.flatMap((key, beans) => {
val output = mutable.List[ProcessedBean]()
val len = beans.length
for (var i=0; i < len - 1; i++) {
for (var j=i+1; j < len; j++) {
output.add((beans(i), beans(j)))
}
}
output
}).repartition(someNumber)
val result:RDD[ProcessedBean] = pairs
.map(beans => processData(beans._1, beans._2))
.filter(_.isDefined)
.map(_.get)
flatMap 步骤仍将受最大列表的限制,重新分区时会发生随机播放,但将 processData 步骤移到 N^2 步骤之外可以获得一些并行性。
我有一个包含数百万个键值对的 RDD 对,其中每个值都是一个列表,可能包含单个元素或数十亿个元素。这会导致性能不佳,因为大型组会阻塞集群的节点数小时,而需要几秒钟的组无法并行处理,因为整个集群已经很忙。
有什么可以改进的吗?
编辑:
给我带来问题的操作是 flatMap
,其中分析了给定键的整个列表。键未被触及,该操作将列表中的每个元素与列表的其余部分进行比较,这需要花费大量时间,但不幸的是它必须完成。这意味着整个列表需要同时在同一个节点中。生成的 RDD 将包含一个子列表,具体取决于 flatMap
.
在这种情况下我不能使用广播变量,因为在不同的键值对之间不会使用公共数据。至于分区器,根据 O'Reilly Learning Spark 一书,这种操作不会从分区器中受益,因为不涉及混洗(尽管我不确定这是否属实)。分区程序可以在这种情况下提供帮助吗?
第二次编辑:
这是我的代码示例:
public class MyFunction implements FlatMapFunction
<Tuple2<String, Iterable<Bean>>, ComparedPerson> {
public Iterable<ProcessedBean> call(Tuple2<Key, Iterable<Bean>> input) throws Exception {
List<ProcessedBean> output = new ArrayList<ProcessedBean>();
List<Bean> listToProcess = CollectionsUtil.makeList(input._2());
// In some cases size == 2, in others size > 100.000
for (int i = 0; i < listToProcess.size() - 1; i++) {
for (int j = i + 1; j < listToProcess.size(); j++) {
ProcessedBean processed = processData(listToProcess.get(i), listToProcess.get(j));
if (processed != null) {
output.add(processed);
}
}
}
return output;
}
双 for 将循环 n(n-1)/2
次,但这是无法避免的。
像这样的偏差通常是特定领域的。您可以将您的价值数据创建为 RDD 并加入其中。或者您可以尝试使用广播变量。或者您可以编写一个自定义分区程序来帮助以不同方式拆分数据。
但是,最终,这将取决于数据的计算和细节。
键的处理顺序对总计算时间没有影响。我可以想象的唯一的方差问题(有些值很小,有些值很大)是在处理结束时:一个大任务仍在 运行 而所有其他节点已经完成。
如果这是您所看到的,您可以尝试增加分区数。这将减少任务的大小,因此最后出现超大任务的可能性较小。
广播变量和分区器对性能没有帮助。我认为您应该专注于尽可能高效地进行所有内容的比较步骤。 (或者更好的是,避免它。我不认为二次算法在大数据中真的可持续。)
如果 'processData' 很昂贵,您可以并行化该步骤并在那里获得一些收益。
在伪代码中,它类似于:
def processData(bean1:Bean, bean2:Bean):Option[ProcessedData] = { ... }
val rdd:RDD[(Key, List[Bean])] = ...
val pairs:RDD[(Bean, Bean)] = rdd.flatMap((key, beans) => {
val output = mutable.List[ProcessedBean]()
val len = beans.length
for (var i=0; i < len - 1; i++) {
for (var j=i+1; j < len; j++) {
output.add((beans(i), beans(j)))
}
}
output
}).repartition(someNumber)
val result:RDD[ProcessedBean] = pairs
.map(beans => processData(beans._1, beans._2))
.filter(_.isDefined)
.map(_.get)
flatMap 步骤仍将受最大列表的限制,重新分区时会发生随机播放,但将 processData 步骤移到 N^2 步骤之外可以获得一些并行性。