Spark 通过一个小的收集 java 堆 space 内存不足

Spark goes java heap space out of memory with a small collect

我遇到了 Spark、它的驱动程序和 OoM 问题。

目前我有一个数据框,它是用几个连接的源(实际上是镶木地板格式的不同 tables)构建的,并且有数千个元组。他们有一个日期,代表记录的创建日期,而且显然是少数。

我执行以下操作:

from pyspark.sql.functions import year, month

# ...

selectionRows = inputDataframe.select(year('registration_date').alias('year'), month('registration_date').alias('month')).distinct()
selectionRows.show() # correctly shows 8 tuples
selectionRows = selectionRows.collect() # goes heap space OoM
print(selectionRows)

读取内存消耗统计显示驱动程序不超过 ~60%。我认为驱动程序应该只加载不同的子集,而不是整个数据帧。

我错过了什么吗?是否有可能以更智能的方式收集那几行?我需要它们作为下推谓词来加载辅助数据框。

非常感谢!

编辑/解决方案

阅读评论并详细说明我的个人需求后,我在每个“join/elaborate”步骤缓存数据帧,以便在时间轴中执行以下操作:

这将一些复杂的 ETL 作业减少到原始时间的 20%(因为以前它是在每次计数时应用每个先前步骤的转换)。

经验教训:)

阅读评论后,我详细阐述了我的用例的解决方案。

如问题中所述,我将几个 table 彼此连接到一个“目标数据帧”中,并且在每次迭代时我都会进行一些转换,如下所示:

# n-th table work
target = target.join(other, how='left')
target = target.filter(...)
target = target.withColumn('a', 'b')
target = target.select(...)
print(f'Target after table "other": {target.count()}')

缓慢/OoM 的问题是 Spark 被迫在每个 table 处从头到尾进行所有转换,因为结尾 count,使得它在每个 [= 处越来越慢=21=] / 迭代.

我找到的解决方案是在每次迭代时缓存数据帧,如下所示:

cache: DataFrame = null

# ...

# n-th table work
target = target.join(other, how='left')
target = target.filter(...)
target = target.withColumn('a', 'b')
target = target.select(...)

target = target.cache()
target_count = target.count() # actually do the cache
if cache:
  cache.unpersist() # free the memory from the old cache version
cache = target

print(f'Target after table "other": {target_count}')