在循环中评估 Spark DataFrame 会随着每次迭代而减慢,所有工作都由控制器完成

Evaluating Spark DataFrame in loop slows down with every iteration, all work done by controller

我正在尝试使用 Spark 集群(在 AWS EMR 上 运行ning)到 link 组具有共同元素的项目。本质上,我有一些元素的组,如果一些元素在多个组中,我想创建一个包含所有这些组的元素的组。

我知道 GraphX 库,我尝试使用 graphframes 包(ConnectedComponents 算法)来解决这个任务,但似乎 graphframes 包还不够成熟并且非常浪费资源... 运行 它在我的数据集(cca 60GB)上它只是 运行s 内存不足,无论我如何调整 Spark 参数,如何分区和重新 -分区我的数据或我创建的集群有多大(图表很大)。

所以我编写了自己的代码来完成任务。 代码有效并且解决了我的问题,但每次迭代都会减慢速度。由于有时需要大约 10 次迭代才能完成,它可能 运行 很长,我无法弄清楚问题是什么。

我从一个 table (DataFrame) item_links 开始,它有两列:itemgroup_name。每个组中的项目都是唯一的,但在此 table 中不是。一个项目可以在多个组中。如果两个项目各有一行具有相同的组名,则它们属于同一组。

我首先按项目分组,并为每个项目找到它所属的所有组中最小的组名。我将此信息作为一个额外的列附加到原始 DataFrame 中。然后我通过按组名分组并在每个组中找到这个新列的最小值来创建一个新的 DataFrame。我将此 DataFrame 与我原来的 table 加入组名,并将组名列替换为该新列中的最小值。这个想法是,如果一个组包含也属于某个较小组的项目,则该组将被合并。在每次迭代中,它 links 组被越来越多的项目间接 link 编辑。

我是运行ning的代码是这样的:

print(" Merging groups that have common items...")

n_partitions = 32

merge_level = 0

min_new_group = "min_new_group_{}".format(merge_level)

# For every item identify the (alphabetically) first group in which this item was found
# and add a new column min_new_group with that information for every item.
first_group = item_links \
                    .groupBy('item') \
                    .agg( min('group_name').alias(min_new_group) ) \
                    .withColumnRenamed('item', 'item_id') \
                    .coalesce(n_partitions) \
                    .cache()

item_links = item_links \
                .join( first_group,
                       item_links['item'] == first_group['item_id'] ) \
                .drop(first_group['item_id']) \
                .coalesce(n_partitions) \
                .cache()

first_group.unpersist()

# In every group find the (alphabetically) smallest min_new_group value.
# If the group contains a item that was in some other group,
# this value will be different than the current group_name.
merged_groups = item_links \
                    .groupBy('group_name') \
                    .agg(
                        min(col(min_new_group)).alias('merged_group')
                    ) \
                    .withColumnRenamed('group_name', 'group_to_merge') \
                    .coalesce(n_partitions) \
                    .cache()

# Replace the group_name column with the lowest group that any of the item in the group had.
item_links = item_links \
                .join( merged_groups,
                       item_links['group_name'] == merged_groups['group_to_merge'] ) \
                .drop(item_links['group_name']) \
                .drop(merged_groups['group_to_merge']) \
                .drop(item_links[min_new_group]) \
                .withColumnRenamed('merged_group', 'group_name') \
                .coalesce(n_partitions) \
                .cache()

# Count the number of common items found
common_items_count = merged_groups.filter(col('merged_group') != col('group_to_merge')).count()

merged_groups.unpersist()

# just some debug output
print("  level {}: found {} common items".format(merge_level, common_items_count))

# As long as the number of groups keep decreasing (groups are merged together), repeat the operation.
while (common_items_count > 0):
    merge_level += 1

    min_new_group = "min_new_group_{}".format(merge_level)

    # for every item find new minimal group...
    first_group = item_links \
                        .groupBy('item') \
                        .agg(
                            min(col('group_name')).alias(min_new_group)
                        ) \
                        .withColumnRenamed('item', 'item_id') \
                        .coalesce(n_partitions) \
                        .cache() 

    item_links = item_links \
                    .join( first_group,
                           item_links['item'] == first_group['item_id'] ) \
                    .drop(first_group['item']) \
                    .coalesce(n_partitions) \
                    .cache()

    first_group.unpersist()

    # find groups that have items from other groups...
    merged_groups = item_links \
                        .groupBy(col('group_name')) \
                        .agg(
                            min(col(min_new_group)).alias('merged_group')
                        ) \
                        .withColumnRenamed('group_name', 'group_to_merge') \
                        .coalesce(n_partitions) \
                        .cache()

    # merge the groups with items from other groups...
    item_links = item_links \
                    .join( merged_groups,
                           item_links['group_name'] == merged_groups['group_to_merge'] ) \
                    .drop(item_links['group_name']) \
                    .drop(merged_groups['group_to_merge']) \
                    .drop(item_links[min_new_group]) \
                    .withColumnRenamed('merged_group', 'group_name') \
                    .coalesce(n_partitions) \
                    .cache()

    common_items_count = merged_groups.filter(col('merged_group') != col('group_to_merge')).count()

    merged_groups.unpersist()

    print("  level {}: found {} common items".format(merge_level, common_items_count))

正如我所说,它有效,但问题是,每次迭代都会减慢速度。迭代 1-3 运行 只需几秒或几分钟。迭代 5 运行s 大约 20-40 分钟。迭代 6 有时甚至没有完成,因为控制器 运行s 内存不足(控制器 14 GB,具有 20 CPU 个核心的整个集群大约 140 GB RAM...测试数据是大约 30 GB)。

当我在 Ganglia 中监控集群时,我看到,在每次迭代之后,工作人员执行的工作越来越少,而控制器执行的工作越来越多。网络流量也下降到零。在初始阶段之后,内存使用量减少了 stable。

我阅读了很多关于重新分区、转换 Spark 参数和 shuffle 操作背景的知识,我尽力优化了所有内容,但我不知道这里发生了什么。下面是随着时间的推移我的集群节点(控制器节点为黄色)的负载,因为上面的代码是 运行ning.

您的代码逻辑正确。只是你从不调用 item_links.unpersist() 所以首先它变慢(尝试与本地磁盘交换)然后 OOM。

Ganglia 中的内存使用情况可能具有误导性。它不会改变,因为内存是在脚本开始时分配给执行程序的,无论他们以后是否使用它。您可以检查 Spark UI 的存储/执行器状态。

我通过在每次迭代结束时将 DataFrame 保存到 HDFS 并在下一次迭代开始时从 HDFS 读回它来解决这个问题。

自从我这样做后,程序运行起来轻而易举,没有任何减速、内存过度使用或驱动程序过载的迹象。

我还是不明白为什么会这样,所以我把问题悬而未决。

一个简单的复现场景:

import time
from pyspark import SparkContext

sc = SparkContext()

def push_and_pop(rdd):
    # two transformations: moves the head element to the tail
    first = rdd.first()
    return rdd.filter(
        lambda obj: obj != first
    ).union(
        sc.parallelize([first])
    )

def serialize_and_deserialize(rdd):
    # perform a collect() action to evaluate the rdd and create a new instance
    return sc.parallelize(rdd.collect())

def do_test(serialize=False):
    rdd = sc.parallelize(range(1000))
    for i in xrange(25):
        t0 = time.time()
        rdd = push_and_pop(rdd)
        if serialize:
            rdd = serialize_and_deserialize(rdd)
        print "%.3f" % (time.time() - t0)

do_test()

显示 25 次迭代期间的主要减速:

0.597 0.117 0.186 0.234 0.288 0.309 0.386 0.439 0.507 0.529 0.553 0.586 0.710 0.728 0.779 0.896 0.866 0.881 0.956 1.049 1.069 1.061 1.149 1.189 1.201

(第一次迭代因为初始化效果比较慢,第二次迭代很快,以后每次迭代都比较慢)

原因似乎是惰性转换链的增长。我们可以通过使用动作滚动 RDD 来检验假设。

do_test(True)

0.897 0.256 0.233 0.229 0.220 0.238 0.234 0.252 0.240 0.267 0.260 0.250 0.244 0.266 0.295 0.464 0.292 0.348 0.320 0.258 0.250 0.201 0.197 0.243 0.230

collect()parallelize() 每次迭代增加大约 0.1 秒,但完全消除了增量减速。

尝试打印 dataFrame.explain 以查看逻辑计划。 每次迭代,此 Dataframe 上的转换都会不断累加到逻辑计划中,因此评估时间会不断累加。

您可以使用以下解决方案作为解决方法:

dataFRame.rdd.localCheckpoint()

这会将此 DataFrame 的 RDD 写入内存并删除谱系,然后根据写入内存的数据创建 RDD。

这样做的好处是您不需要将 RDD 写入 HDFS 或磁盘。 但是,这也会带来一些问题,这些问题可能会或可能不会影响您。您可以阅读 "localCheckPointing" 方法的文档了解详细信息。