即使不坚持,Spark 内存缓存也会不断增加
Spark memory cache keeps increasing even with unpersist
我正在遍历 3 个大文件并执行大量统计计算。
除了 1 个 CORE 和 1 个 MASTER 节点外,我有 55GB 的可用内存、8V 内核和最多 10 个可用的 TASK 节点。
以下是我实际代码的伪代码:
#Load MyConfigMeta file- this is a small file and will be a couple of times in the code
MyConfigMeta=spark.read.parquet("s3://path/MyConfigMeta.parquet")
MyConfigMeta=MyConfigMeta.persist(StorageLevel.MEMORY_AND_DISK)
#Very Large timeseries files
modules=["s3://path/file1.parquet",
"s3://path/file2.parquet",
"s3://path/file3.parquet"]
for file in modules:
out_filename=1
df1=spark.read.parquet(file)
df1=df1.join(MyConfigMeta, on=["key"], how="inner")
#Find out latest column values based on Timestamp
lim_max=df1.groupBy('key')\
.agg(f.max('TIME_STAMP').alias('TIME_STAMP'))
temp=df1.select('TIME_STAMP','key',''UL','LL')
lim_max=lim_max.join(temp, on=['TIME_STAMP','key'], how="left")\
.drop('TIME_STAMP')\
.distinct()
lim_max=lim_max.persist(StorageLevel.MEMORY_AND_DISK)
df1=df1.drop('UL,'LL')\
.join(lim_max, on=['key'], how="left")\
withColumn('out_clip', when(col('RESULT').between(col('LL'),col('UL')), 0).otherwise(1))\
df1=df1.persist(StorageLevel.MEMORY_AND_DISK) # This is a very large dataframe and will later be used for simulation
df2=df1.filter(col('out_clip')==0)\
.groupBy('key')\
.agg(f.round(expr('percentile(RESULT, 0.9999)'),4).alias('UPPER_PERCENTILE'),
f.round(expr('percentile(RESULT, 0.0001)'),4).alias('LOWER_PERCENTILE'))\
.withColumn('pcnt_clip', when(col('RESULT').between(col('LOWER_PERCENTILE'),col('UPPER_PERCENTILE')), 0).otherwise(1))\
.filter(col('pcnt_clip')==0)
stats=df2.groupBy('key')\
.agg(#Perform a bunch of statistical calculations (mean, avg, kurtosis, skew))
stats=stats.join(lim_max, on=['key'], how="left") #get back the columns from lim_max
lim_max=lim_max.unpersist()
stats=stats.withColumn('New_UL', #formula to calculate new limits)\
.withColumn('New_LL', #formula to calculate new limits)\
.join(MyConfigMeta, on=['key'], how="left")
#Simulate data
df_sim=df1.join(stats, on=['key'], how="inner")\
.withColumn('newOOC', when ((col('RESULT')<col('New_LL')) | (col('RESULT')>col('New_UL')), 1).otherwise(0))
df3=df_sim.groupBy('key')\
.agg(f.sum('newOOC').alias('simulated result'))
#Join back with stats to get statistcal data, context data along with simulated data
df4=df3.join(stats, on=['key'], how="inner")
#Write output file
df4.write.mode('overwrite').parquet("s3://path/sim_" +out_filename+ ".parquet")
df1=df1.unpersist()
spark.catalog.clearCache()
我的 spark-submit 配置是 6 executor-cores
和 driver-cores
、41GB executor-memory
、41GB driver-memory
、14GB spark.executor.memoryOverhead
和 9
num-executors`.
当我查看 Ganglia 中的内存图表时,我注意到第一个文件完成得很好,但是后续文件的计算失败了,因为它一直 运行ning 进入丢失节点问题
ExecutorLostFailure (executor 5 exited unrelated to the running tasks) Reason: Container marked as failed. Diagnostics: Container released on a lost node.
自从我不坚持 df1
数据帧并使用 spark.catalog.clearCache()
以来,我本以为缓存内存会显着清除。但是内存似乎在不断增加而没有被清除。
但是,如果我 运行 单独的文件似乎工作正常。
在这里,一大块内存被清除只是因为有 10 个执行者死亡并被列入黑名单。
有没有办法在 spark 中强制刷新内存?
或者还有其他原因导致我不断丢失节点?
您可以使用以下函数刷新 SparkContext 中的所有持久化数据集。它列出 RDD 并调用 unpersist 方法。在函数内部创建 DF 时特别有用。
def unpersist_dataframes() -> None:
for (id, rdd) in sc._jsc.getPersistentRDDs().items():
rdd.unpersist()
print("Unpersisted {} rdd".format(id))
为了监控持久化的数据帧,请改为检查 Storage tab from the SparkUI。不要担心 Ganglia 统计中的空闲内存,实际上这可能是您的资源没有得到充分利用的迹象。 Spark 明智地管理内存。
关于丢失的节点,如果您使用像 Databricks 这样的托管服务,它会在集群的事件日志中显示节点终止的原因。
我正在遍历 3 个大文件并执行大量统计计算。
除了 1 个 CORE 和 1 个 MASTER 节点外,我有 55GB 的可用内存、8V 内核和最多 10 个可用的 TASK 节点。
以下是我实际代码的伪代码:
#Load MyConfigMeta file- this is a small file and will be a couple of times in the code
MyConfigMeta=spark.read.parquet("s3://path/MyConfigMeta.parquet")
MyConfigMeta=MyConfigMeta.persist(StorageLevel.MEMORY_AND_DISK)
#Very Large timeseries files
modules=["s3://path/file1.parquet",
"s3://path/file2.parquet",
"s3://path/file3.parquet"]
for file in modules:
out_filename=1
df1=spark.read.parquet(file)
df1=df1.join(MyConfigMeta, on=["key"], how="inner")
#Find out latest column values based on Timestamp
lim_max=df1.groupBy('key')\
.agg(f.max('TIME_STAMP').alias('TIME_STAMP'))
temp=df1.select('TIME_STAMP','key',''UL','LL')
lim_max=lim_max.join(temp, on=['TIME_STAMP','key'], how="left")\
.drop('TIME_STAMP')\
.distinct()
lim_max=lim_max.persist(StorageLevel.MEMORY_AND_DISK)
df1=df1.drop('UL,'LL')\
.join(lim_max, on=['key'], how="left")\
withColumn('out_clip', when(col('RESULT').between(col('LL'),col('UL')), 0).otherwise(1))\
df1=df1.persist(StorageLevel.MEMORY_AND_DISK) # This is a very large dataframe and will later be used for simulation
df2=df1.filter(col('out_clip')==0)\
.groupBy('key')\
.agg(f.round(expr('percentile(RESULT, 0.9999)'),4).alias('UPPER_PERCENTILE'),
f.round(expr('percentile(RESULT, 0.0001)'),4).alias('LOWER_PERCENTILE'))\
.withColumn('pcnt_clip', when(col('RESULT').between(col('LOWER_PERCENTILE'),col('UPPER_PERCENTILE')), 0).otherwise(1))\
.filter(col('pcnt_clip')==0)
stats=df2.groupBy('key')\
.agg(#Perform a bunch of statistical calculations (mean, avg, kurtosis, skew))
stats=stats.join(lim_max, on=['key'], how="left") #get back the columns from lim_max
lim_max=lim_max.unpersist()
stats=stats.withColumn('New_UL', #formula to calculate new limits)\
.withColumn('New_LL', #formula to calculate new limits)\
.join(MyConfigMeta, on=['key'], how="left")
#Simulate data
df_sim=df1.join(stats, on=['key'], how="inner")\
.withColumn('newOOC', when ((col('RESULT')<col('New_LL')) | (col('RESULT')>col('New_UL')), 1).otherwise(0))
df3=df_sim.groupBy('key')\
.agg(f.sum('newOOC').alias('simulated result'))
#Join back with stats to get statistcal data, context data along with simulated data
df4=df3.join(stats, on=['key'], how="inner")
#Write output file
df4.write.mode('overwrite').parquet("s3://path/sim_" +out_filename+ ".parquet")
df1=df1.unpersist()
spark.catalog.clearCache()
我的 spark-submit 配置是 6 executor-cores
和 driver-cores
、41GB executor-memory
、41GB driver-memory
、14GB spark.executor.memoryOverhead
和 9
num-executors`.
当我查看 Ganglia 中的内存图表时,我注意到第一个文件完成得很好,但是后续文件的计算失败了,因为它一直 运行ning 进入丢失节点问题
ExecutorLostFailure (executor 5 exited unrelated to the running tasks) Reason: Container marked as failed. Diagnostics: Container released on a lost node.
自从我不坚持 df1
数据帧并使用 spark.catalog.clearCache()
以来,我本以为缓存内存会显着清除。但是内存似乎在不断增加而没有被清除。
但是,如果我 运行 单独的文件似乎工作正常。
在这里,一大块内存被清除只是因为有 10 个执行者死亡并被列入黑名单。
有没有办法在 spark 中强制刷新内存? 或者还有其他原因导致我不断丢失节点?
您可以使用以下函数刷新 SparkContext 中的所有持久化数据集。它列出 RDD 并调用 unpersist 方法。在函数内部创建 DF 时特别有用。
def unpersist_dataframes() -> None:
for (id, rdd) in sc._jsc.getPersistentRDDs().items():
rdd.unpersist()
print("Unpersisted {} rdd".format(id))
为了监控持久化的数据帧,请改为检查 Storage tab from the SparkUI。不要担心 Ganglia 统计中的空闲内存,实际上这可能是您的资源没有得到充分利用的迹象。 Spark 明智地管理内存。
关于丢失的节点,如果您使用像 Databricks 这样的托管服务,它会在集群的事件日志中显示节点终止的原因。