处理一对具有严重偏斜数据的 RDD 时性能不佳

Poor performance processing a pair RDD with very skewed data

我有一个包含数百万个键值对的 RDD 对,其中每个值都是一个列表,可能包含单个元素或数十亿个元素。这会导致性能不佳,因为大型组会阻塞集群的节点数小时,而需要几秒钟的组无法并行处理,因为整个集群已经很忙。

有什么可以改进的吗?

编辑:

给我带来问题的操作是 flatMap,其中分析了给定键的整个列表。键未被触及,该操作将列表中的每个元素与列表的其余部分进行比较,这需要花费大量时间,但不幸的是它必须完成。这意味着整个列表需要同时在同一个节点中。生成的 RDD 将包含一个子列表,具体取决于 flatMap.

中计算的值

在这种情况下我不能使用广播变量,因为在不同的键值对之间不会使用公共数据。至于分区器,根据 O'Reilly Learning Spark 一书,这种操作不会从分区器中受益,因为不涉及混洗(尽管我不确定这是否属实)。分区程序可以在这种情况下提供帮助吗?

第二次编辑:

这是我的代码示例:

public class MyFunction implements FlatMapFunction
    <Tuple2<String, Iterable<Bean>>, ComparedPerson>  {


public Iterable<ProcessedBean> call(Tuple2<Key, Iterable<Bean>> input) throws Exception {
    List<ProcessedBean> output = new ArrayList<ProcessedBean>();
    List<Bean> listToProcess = CollectionsUtil.makeList(input._2());

    // In some cases size == 2, in others size > 100.000
    for (int i = 0; i < listToProcess.size() - 1; i++) {
        for (int j = i + 1; j < listToProcess.size(); j++) {
            ProcessedBean processed = processData(listToProcess.get(i), listToProcess.get(j));

            if (processed != null) {
                output.add(processed);
            }
        }
    }

    return output;
}

双 for 将循环 n(n-1)/2 次,但这是无法避免的。

像这样的偏差通常是特定领域的。您可以将您的价值数据创建为 RDD 并加入其中。或者您可以尝试使用广播变量。或者您可以编写一个自定义分区程序来帮助以不同方式拆分数据。

但是,最终,这将取决于数据的计算和细节。

键的处理顺序对总计算时间没有影响。我可以想象的唯一的方差问题(有些值很小,有些值很大)是在处理结束时:一个大任务仍在 运行 而所有其他节点已经完成。

如果这是您所看到的,您可以尝试增加分区数。这将减少任务的大小,因此最后出现超大任务的可能性较小。

广播变量和分区器对性能没有帮助。我认为您应该专注于尽可能高效地进行所有内容的比较步骤。 (或者更好的是,避免它。我不认为二次算法在大数据中真的可持续。)

如果 'processData' 很昂贵,您可以并行化该步骤并在那里获得一些收益。

在伪代码中,它类似于:

def processData(bean1:Bean, bean2:Bean):Option[ProcessedData] = { ... }

val rdd:RDD[(Key, List[Bean])] = ...

val pairs:RDD[(Bean, Bean)] = rdd.flatMap((key, beans) => {
    val output = mutable.List[ProcessedBean]()
    val len = beans.length
    for (var i=0; i < len - 1; i++) {
        for (var j=i+1; j < len; j++) {
            output.add((beans(i), beans(j)))
        }
    }
    output
}).repartition(someNumber)

val result:RDD[ProcessedBean] = pairs
    .map(beans => processData(beans._1, beans._2))
    .filter(_.isDefined)
    .map(_.get)

flatMap 步骤仍将受最大列表的限制,重新分区时会发生随机播放,但将 processData 步骤移到 N^2 步骤之外可以获得一些并行性。