为什么我的 Spark mapPartition 函数变慢了?

Why my Spark mapPartition function is being slowed?

我的算法很简单:我使用 Spark 来分发 运行 在 Python 中进行交叉验证的进程的处理。我有 3 个工作人员,我所做的就是为每个工作人员分配一个二进制数组给 运行 Python 函数,Driver 然后获取结果并将它们存储在 JSON 中。下面是代码的简化:

stars_subsets = []
for i in range(15):
    star_subset = (i + 1, generate_random_binary_array())
    stars_subsets.append(star_subset)

stars_parallelized = sc.parallelize(stars_subsets)

result = stars_parallelized \
    .partitionBy(NUMBER_OF_WORKERS, partitionFunc=lambda key: key * NUMBER_OF_WORKERS // len(stars_subsets)) \
    .mapPartitions(lambda records: cross_validation(records), preservesPartitioning=True) \
    .collect()

请注意,我生成分区的方式是让所有节点接收相同数量的数组进行测试,并且 mapPartition() 确保一次处理一个。 我的问题是,如果我 运行 仅在驱动程序上进行交叉验证,随着交叉验证中使用的二进制数组的大小增加,时间将如预期的那样线性增加(请忽略图片中的第一个峰值,它们是由我们正在测试的算法规则问题引起的。

顺序代码(仅驱动):

for records in stars_subsets:
    cross_validation(records)

连续次数结果:

但是在 Spark 中分配计算时,有一些异常峰值,我不知道是什么原因造成的。我已经记录了 Tuning section of the official documentation 中指示的垃圾收集器行为,但我找不到任何超过 1 秒的 GC 操作。另外,我尝试将 GC 更改为 Java GC 造成瓶颈时推荐的 G1GC,但性能更差。

分布式交叉验证次数:

我开始相信存在不止一个 GC 而我们从错误的 GC 记录日志,或者还有另一个我们没有考虑的内部 Spark 问题。

非常欢迎任何帮助!

我发现了问题。这不是由于垃圾收集器,也不是由于某些网络现象。发生的事情是 Spark 将多个分区分配给同一个工作节点,即使有一个空闲工作节点也是如此。

发生这种情况时,每个worker所做的处理都占用了所有的核心,所以如果分配了两个分区,它们就是两个进程,需要机器的所有核心,从而降低了性能。

关于这个问题我已经打开another more specific question