为什么我的 Spark mapPartition 函数变慢了?
Why my Spark mapPartition function is being slowed?
我的算法很简单:我使用 Spark 来分发 运行 在 Python 中进行交叉验证的进程的处理。我有 3 个工作人员,我所做的就是为每个工作人员分配一个二进制数组给 运行 Python 函数,Driver 然后获取结果并将它们存储在 JSON 中。下面是代码的简化:
stars_subsets = []
for i in range(15):
star_subset = (i + 1, generate_random_binary_array())
stars_subsets.append(star_subset)
stars_parallelized = sc.parallelize(stars_subsets)
result = stars_parallelized \
.partitionBy(NUMBER_OF_WORKERS, partitionFunc=lambda key: key * NUMBER_OF_WORKERS // len(stars_subsets)) \
.mapPartitions(lambda records: cross_validation(records), preservesPartitioning=True) \
.collect()
请注意,我生成分区的方式是让所有节点接收相同数量的数组进行测试,并且 mapPartition()
确保一次处理一个。
我的问题是,如果我 运行 仅在驱动程序上进行交叉验证,随着交叉验证中使用的二进制数组的大小增加,时间将如预期的那样线性增加(请忽略图片中的第一个峰值,它们是由我们正在测试的算法规则问题引起的。
顺序代码(仅驱动):
for records in stars_subsets:
cross_validation(records)
连续次数结果:
但是在 Spark 中分配计算时,有一些异常峰值,我不知道是什么原因造成的。我已经记录了 Tuning section of the official documentation 中指示的垃圾收集器行为,但我找不到任何超过 1 秒的 GC 操作。另外,我尝试将 GC 更改为 Java GC 造成瓶颈时推荐的 G1GC,但性能更差。
分布式交叉验证次数:
我开始相信存在不止一个 GC 而我们从错误的 GC 记录日志,或者还有另一个我们没有考虑的内部 Spark 问题。
非常欢迎任何帮助!
我发现了问题。这不是由于垃圾收集器,也不是由于某些网络现象。发生的事情是 Spark 将多个分区分配给同一个工作节点,即使有一个空闲工作节点也是如此。
发生这种情况时,每个worker所做的处理都占用了所有的核心,所以如果分配了两个分区,它们就是两个进程,需要机器的所有核心,从而降低了性能。
关于这个问题我已经打开another more specific question
我的算法很简单:我使用 Spark 来分发 运行 在 Python 中进行交叉验证的进程的处理。我有 3 个工作人员,我所做的就是为每个工作人员分配一个二进制数组给 运行 Python 函数,Driver 然后获取结果并将它们存储在 JSON 中。下面是代码的简化:
stars_subsets = []
for i in range(15):
star_subset = (i + 1, generate_random_binary_array())
stars_subsets.append(star_subset)
stars_parallelized = sc.parallelize(stars_subsets)
result = stars_parallelized \
.partitionBy(NUMBER_OF_WORKERS, partitionFunc=lambda key: key * NUMBER_OF_WORKERS // len(stars_subsets)) \
.mapPartitions(lambda records: cross_validation(records), preservesPartitioning=True) \
.collect()
请注意,我生成分区的方式是让所有节点接收相同数量的数组进行测试,并且 mapPartition()
确保一次处理一个。
我的问题是,如果我 运行 仅在驱动程序上进行交叉验证,随着交叉验证中使用的二进制数组的大小增加,时间将如预期的那样线性增加(请忽略图片中的第一个峰值,它们是由我们正在测试的算法规则问题引起的。
顺序代码(仅驱动):
for records in stars_subsets:
cross_validation(records)
连续次数结果:
但是在 Spark 中分配计算时,有一些异常峰值,我不知道是什么原因造成的。我已经记录了 Tuning section of the official documentation 中指示的垃圾收集器行为,但我找不到任何超过 1 秒的 GC 操作。另外,我尝试将 GC 更改为 Java GC 造成瓶颈时推荐的 G1GC,但性能更差。
分布式交叉验证次数:
我开始相信存在不止一个 GC 而我们从错误的 GC 记录日志,或者还有另一个我们没有考虑的内部 Spark 问题。
非常欢迎任何帮助!
我发现了问题。这不是由于垃圾收集器,也不是由于某些网络现象。发生的事情是 Spark 将多个分区分配给同一个工作节点,即使有一个空闲工作节点也是如此。
发生这种情况时,每个worker所做的处理都占用了所有的核心,所以如果分配了两个分区,它们就是两个进程,需要机器的所有核心,从而降低了性能。
关于这个问题我已经打开another more specific question