尝试保留数据帧时内存不足
Out of memory when trying to persist a dataframe
我在尝试保留数据帧时遇到内存不足错误,我真的不明白为什么。我有一个大约 20Gb 的数据框,其中包含 250 万行和大约 20 列。过滤此数据框后,我有 4 列和 50 万行。
现在我的问题是,当我保留过滤后的数据帧时,出现内存不足错误(超过使用的 20 Gb 物理内存中的 25.4Gb)。我尝试过在不同的存储级别上坚持
df = spark.read.parquet(path) # 20 Gb
df_filter = df.select('a', 'b', 'c', 'd').where(df.a == something) # a few Gb
df_filter.persist(StorageLevel.MEMORY_AND_DISK)
df_filter.count()
我的集群有 8 个节点,每个节点有 30Gb 内存。
您知道 OOM 可能来自哪里吗?
只是一些有助于确定根本原因的建议...
您可能有(或组合)...
- 倾斜的源数据分区拆分大小很难处理并导致垃圾收集、OOM 等(这些方法对我有帮助,但每个用例可能有更好的方法)
# to check num partitions
df_filter.rdd.getNumPartitions()
# to repartition (**does cause shuffle**) to increase parallelism and help with data skew
df_filter.repartition(...) # monitor/debug performance in spark ui after setting
- 太多 little/too 在配置中设置了很多 executors/ram/cores
# check via
spark.sparkContext.getConf().getAll()
# these are the ones you want to watch out for
'''
--num-executors
--executor-cores
--executor-memory
'''
- 广泛的转换也改变了大小 little/too 许多 => 尝试一般调试检查以查看持久化时将触发的转换 + 找到它们的输出分区到磁盘的数量
# debug directed acyclic graph [dag]
df_filter.explain() # also "babysit" in spark UI to examine performance of each node/partitions to get specs when you are persisting
# check output partitions if shuffle occurs
spark.conf.get("spark.sql.shuffle.partitions")
我在尝试保留数据帧时遇到内存不足错误,我真的不明白为什么。我有一个大约 20Gb 的数据框,其中包含 250 万行和大约 20 列。过滤此数据框后,我有 4 列和 50 万行。
现在我的问题是,当我保留过滤后的数据帧时,出现内存不足错误(超过使用的 20 Gb 物理内存中的 25.4Gb)。我尝试过在不同的存储级别上坚持
df = spark.read.parquet(path) # 20 Gb
df_filter = df.select('a', 'b', 'c', 'd').where(df.a == something) # a few Gb
df_filter.persist(StorageLevel.MEMORY_AND_DISK)
df_filter.count()
我的集群有 8 个节点,每个节点有 30Gb 内存。
您知道 OOM 可能来自哪里吗?
只是一些有助于确定根本原因的建议...
您可能有(或组合)...
- 倾斜的源数据分区拆分大小很难处理并导致垃圾收集、OOM 等(这些方法对我有帮助,但每个用例可能有更好的方法)
# to check num partitions
df_filter.rdd.getNumPartitions()
# to repartition (**does cause shuffle**) to increase parallelism and help with data skew
df_filter.repartition(...) # monitor/debug performance in spark ui after setting
- 太多 little/too 在配置中设置了很多 executors/ram/cores
# check via
spark.sparkContext.getConf().getAll()
# these are the ones you want to watch out for
'''
--num-executors
--executor-cores
--executor-memory
'''
- 广泛的转换也改变了大小 little/too 许多 => 尝试一般调试检查以查看持久化时将触发的转换 + 找到它们的输出分区到磁盘的数量
# debug directed acyclic graph [dag]
df_filter.explain() # also "babysit" in spark UI to examine performance of each node/partitions to get specs when you are persisting
# check output partitions if shuffle occurs
spark.conf.get("spark.sql.shuffle.partitions")