Spark 通过一个小的收集 java 堆 space 内存不足
Spark goes java heap space out of memory with a small collect
我遇到了 Spark、它的驱动程序和 OoM 问题。
目前我有一个数据框,它是用几个连接的源(实际上是镶木地板格式的不同 tables)构建的,并且有数千个元组。他们有一个日期,代表记录的创建日期,而且显然是少数。
我执行以下操作:
from pyspark.sql.functions import year, month
# ...
selectionRows = inputDataframe.select(year('registration_date').alias('year'), month('registration_date').alias('month')).distinct()
selectionRows.show() # correctly shows 8 tuples
selectionRows = selectionRows.collect() # goes heap space OoM
print(selectionRows)
读取内存消耗统计显示驱动程序不超过 ~60%。我认为驱动程序应该只加载不同的子集,而不是整个数据帧。
我错过了什么吗?是否有可能以更智能的方式收集那几行?我需要它们作为下推谓词来加载辅助数据框。
非常感谢!
编辑/解决方案
阅读评论并详细说明我的个人需求后,我在每个“join/elaborate”步骤缓存数据帧,以便在时间轴中执行以下操作:
- 加入加载table
- 排队需要的转换
- 应用缓存转换
- 打印计数以跟踪基数(主要用于跟踪/调试目的)并因此应用所有转换 + 缓存
- 如果可用(tick/tock 范例)
,取消保留上一个兄弟步骤的缓存
这将一些复杂的 ETL 作业减少到原始时间的 20%(因为以前它是在每次计数时应用每个先前步骤的转换)。
经验教训:)
阅读评论后,我详细阐述了我的用例的解决方案。
如问题中所述,我将几个 table 彼此连接到一个“目标数据帧”中,并且在每次迭代时我都会进行一些转换,如下所示:
# n-th table work
target = target.join(other, how='left')
target = target.filter(...)
target = target.withColumn('a', 'b')
target = target.select(...)
print(f'Target after table "other": {target.count()}')
缓慢/OoM 的问题是 Spark 被迫在每个 table 处从头到尾进行所有转换,因为结尾 count
,使得它在每个 [= 处越来越慢=21=] / 迭代.
我找到的解决方案是在每次迭代时缓存数据帧,如下所示:
cache: DataFrame = null
# ...
# n-th table work
target = target.join(other, how='left')
target = target.filter(...)
target = target.withColumn('a', 'b')
target = target.select(...)
target = target.cache()
target_count = target.count() # actually do the cache
if cache:
cache.unpersist() # free the memory from the old cache version
cache = target
print(f'Target after table "other": {target_count}')
我遇到了 Spark、它的驱动程序和 OoM 问题。
目前我有一个数据框,它是用几个连接的源(实际上是镶木地板格式的不同 tables)构建的,并且有数千个元组。他们有一个日期,代表记录的创建日期,而且显然是少数。
我执行以下操作:
from pyspark.sql.functions import year, month
# ...
selectionRows = inputDataframe.select(year('registration_date').alias('year'), month('registration_date').alias('month')).distinct()
selectionRows.show() # correctly shows 8 tuples
selectionRows = selectionRows.collect() # goes heap space OoM
print(selectionRows)
读取内存消耗统计显示驱动程序不超过 ~60%。我认为驱动程序应该只加载不同的子集,而不是整个数据帧。
我错过了什么吗?是否有可能以更智能的方式收集那几行?我需要它们作为下推谓词来加载辅助数据框。
非常感谢!
编辑/解决方案
阅读评论并详细说明我的个人需求后,我在每个“join/elaborate”步骤缓存数据帧,以便在时间轴中执行以下操作:
- 加入加载table
- 排队需要的转换
- 应用缓存转换
- 打印计数以跟踪基数(主要用于跟踪/调试目的)并因此应用所有转换 + 缓存
- 如果可用(tick/tock 范例) ,取消保留上一个兄弟步骤的缓存
这将一些复杂的 ETL 作业减少到原始时间的 20%(因为以前它是在每次计数时应用每个先前步骤的转换)。
经验教训:)
阅读评论后,我详细阐述了我的用例的解决方案。
如问题中所述,我将几个 table 彼此连接到一个“目标数据帧”中,并且在每次迭代时我都会进行一些转换,如下所示:
# n-th table work
target = target.join(other, how='left')
target = target.filter(...)
target = target.withColumn('a', 'b')
target = target.select(...)
print(f'Target after table "other": {target.count()}')
缓慢/OoM 的问题是 Spark 被迫在每个 table 处从头到尾进行所有转换,因为结尾 count
,使得它在每个 [= 处越来越慢=21=] / 迭代.
我找到的解决方案是在每次迭代时缓存数据帧,如下所示:
cache: DataFrame = null
# ...
# n-th table work
target = target.join(other, how='left')
target = target.filter(...)
target = target.withColumn('a', 'b')
target = target.select(...)
target = target.cache()
target_count = target.count() # actually do the cache
if cache:
cache.unpersist() # free the memory from the old cache version
cache = target
print(f'Target after table "other": {target_count}')