For Loop 在 EMR (pyspark) 中不断重启
For Loop keeps restarting in EMR (pyspark)
我有一个嵌套的 for 循环,它在内部循环中对数据帧执行 10 次操作,并在完成内部循环后将生成的 10 个数据帧连接到一个数据帧中。
更新: 我使用字典创建一个数据帧列表来存储每个操作然后合并它们位于内部循环的末尾。
然后将其写入带有outloop迭代次数的parquet文件。
outerloop 有 6 次迭代,因此应该产生 6 个 parquet 文件。
事情是这样的:
train=0
for i in range(0,6):
train=train+30
#For loop to aggregate input and create 10 output dataframes
dfnames={}
for j in range(0,10):
ident="_"+str(j)
#Load dataframe of around 1M rows
df=spark.read.parquet("s3://path")
dfnames['df'+ident]= #Perform aggregations and operations
#Combine the 10 datframes into a single df
df_out=df_1.uniionByName(d_2).unionByName(df_3)...unionByName(df_10)
#Write to output parquet file
df_out.write.mode('overwrite').parquet("s3://path/" + str(train) +".parquet"
在完成外循环的第 3 次迭代之前,它似乎工作正常。然后出于某种原因,它使用另一个尝试 ID 重新启动循环。
所以我得到了前 3 个文件,但它没有进入第 4 次迭代,而是重新启动以重新提供第一个文件。我没有得到任何失败的阶段或工作。
我已经尝试 运行 for 循环单独使用虚拟变量和打印语句(不加载大型数据帧等)并且它们可以很好地完成。
我认为这与循环后刷新内存的方式有关。
这些是我的 EMR Spark 运行 条件:
我在一个 EMR 集群上 运行 这个,有 5 个执行器、5 个驱动程序节点和 10 个实例,总共有 50 个内核。 spark executor和driver内存各45G,合计约583G。
典型的shuffle读250G,shuffle写331G。
一些相关的 Spark 环境变量如下所示:
我在循环或内存管理方面做错了什么吗?
任何见解将不胜感激!
你如何在这一行之前得到你的 df1、df2...?
#Combine the 10 datframes into a single df df_out=df1.uniionByName(d2).unionByName(df3)...unionByName(df10)
我的猜测是,您的数据框计划越来越大,这可能会导致问题。
我建议在内部循环中创建一个数据帧列表,并使用 reduce
方法合并它们。
如下所示
from functools import reduce
from pyspark.sql import DataFrame
df_list = []
for j in range(0,10):
#Load dataframe of around 1M rows
df = spark.read.parquet("s3://path")
transformed_df = #do your transforms
df_list.append(transformed_df)
final_df = reduce(DataFrame.unionByName, df_list)
尽量不要将 Python 数据结构与 Spark 数据结构相结合。
您想将 for 循环转换为 map-reduce、foreach 设计形式。
与此同时,您可以在每次迭代中创建一个缓存/火花检查点,以避免从头开始重新运行整个 DAG。
要缓存您的数据:
df.cache()
用于检查点
spark.sparkContext.setCheckpointDir('<some path>')
df.checkpoint()
一旦您使用 spark 构造而不是 python 构造,这些将显示性能和规模改进。例如,将 for 循环替换为 foreach,将列表的联合替换为 map reduce。
我有一个嵌套的 for 循环,它在内部循环中对数据帧执行 10 次操作,并在完成内部循环后将生成的 10 个数据帧连接到一个数据帧中。
更新: 我使用字典创建一个数据帧列表来存储每个操作然后合并它们位于内部循环的末尾。
然后将其写入带有outloop迭代次数的parquet文件。 outerloop 有 6 次迭代,因此应该产生 6 个 parquet 文件。
事情是这样的:
train=0
for i in range(0,6):
train=train+30
#For loop to aggregate input and create 10 output dataframes
dfnames={}
for j in range(0,10):
ident="_"+str(j)
#Load dataframe of around 1M rows
df=spark.read.parquet("s3://path")
dfnames['df'+ident]= #Perform aggregations and operations
#Combine the 10 datframes into a single df
df_out=df_1.uniionByName(d_2).unionByName(df_3)...unionByName(df_10)
#Write to output parquet file
df_out.write.mode('overwrite').parquet("s3://path/" + str(train) +".parquet"
在完成外循环的第 3 次迭代之前,它似乎工作正常。然后出于某种原因,它使用另一个尝试 ID 重新启动循环。 所以我得到了前 3 个文件,但它没有进入第 4 次迭代,而是重新启动以重新提供第一个文件。我没有得到任何失败的阶段或工作。
我已经尝试 运行 for 循环单独使用虚拟变量和打印语句(不加载大型数据帧等)并且它们可以很好地完成。 我认为这与循环后刷新内存的方式有关。
这些是我的 EMR Spark 运行 条件: 我在一个 EMR 集群上 运行 这个,有 5 个执行器、5 个驱动程序节点和 10 个实例,总共有 50 个内核。 spark executor和driver内存各45G,合计约583G。 典型的shuffle读250G,shuffle写331G。
一些相关的 Spark 环境变量如下所示:
我在循环或内存管理方面做错了什么吗? 任何见解将不胜感激!
你如何在这一行之前得到你的 df1、df2...?
#Combine the 10 datframes into a single df df_out=df1.uniionByName(d2).unionByName(df3)...unionByName(df10)
我的猜测是,您的数据框计划越来越大,这可能会导致问题。
我建议在内部循环中创建一个数据帧列表,并使用 reduce
方法合并它们。
如下所示
from functools import reduce
from pyspark.sql import DataFrame
df_list = []
for j in range(0,10):
#Load dataframe of around 1M rows
df = spark.read.parquet("s3://path")
transformed_df = #do your transforms
df_list.append(transformed_df)
final_df = reduce(DataFrame.unionByName, df_list)
尽量不要将 Python 数据结构与 Spark 数据结构相结合。
您想将 for 循环转换为 map-reduce、foreach 设计形式。
与此同时,您可以在每次迭代中创建一个缓存/火花检查点,以避免从头开始重新运行整个 DAG。
要缓存您的数据:
df.cache()
用于检查点
spark.sparkContext.setCheckpointDir('<some path>')
df.checkpoint()
一旦您使用 spark 构造而不是 python 构造,这些将显示性能和规模改进。例如,将 for 循环替换为 foreach,将列表的联合替换为 map reduce。