缓存和循环(Pyspark
Cacheing and Loops in (Py)Spark
我知道在使用 Spark 时通常要避免 'for' 和 'while' 循环。我的问题是关于优化 'while' 循环,但如果我遗漏了一个不必要的解决方案,我会洗耳恭听。
我不确定我能否用玩具数据证明这个问题(处理时间非常长,随着循环的进行而复杂化),但这里有一些伪代码:
### I have a function - called 'enumerator' - which involves several joins and window functions.
# I run this function on my base dataset, df0, and return df1
df1 = enumerator(df0, param1 = apple, param2 = banana)
# Check for some condition in df1, then count number of rows in the result
counter = df1 \
.filter(col('X') == some_condition) \
.count()
# If there are rows meeting this condition, start a while loop
while counter > 0:
print('Starting with counter: ', str(counter))
# Run the enumerator function on df1 again
df2 = enumerator(df1, param1= apple, param2 = banana)
# Check for the condition again, then continue the while loop if necessary
counter = df2 \
.filter(col('X') == some_condition) \
.count()
df1 = df2
# After the while loop finishes, I take the last resulting dataframe and I will do several more operations and analyses downstream
final_df = df2
枚举器函数的一个重要方面是 'look back' window 中的一个序列,因此在进行所有必要的更正之前可能需要运行几次。
在我心里,我知道这很丑陋,但是 windowing/ranking/sequential 函数内的分析很关键。我的理解是,随着循环的继续,底层的 Spark 查询计划会变得越来越复杂。在这种情况下,我应该采用什么最佳实践吗?我是否应该在任何时候进行缓存 - 在 while 循环开始之前,还是在循环本身内?
你绝对应该 cache/persist 数据帧,否则 while
循环中的每次迭代都将从 df0
开始。此外,您可能希望取消保留使用过的数据帧以释放 disk/memory space.
还有一点要优化的是不要做一个count
,而是使用更便宜的操作,比如df.take(1)
。如果 returns 什么都没有,那么 counter == 0
.
df1 = enumerator(df0, param1 = apple, param2 = banana)
df1.cache()
# Check for some condition in df1, then count number of rows in the result
counter = len(df1.filter(col('X') == some_condition).take(1))
while counter > 0:
print('Starting with counter: ', str(counter))
df2 = enumerator(df1, param1 = apple, param2 = banana)
df2.cache()
counter = len(df2.filter(col('X') == some_condition).take(1))
df1.unpersist() # unpersist df1 as it will be overwritten
df1 = df2
final_df = df2
我知道在使用 Spark 时通常要避免 'for' 和 'while' 循环。我的问题是关于优化 'while' 循环,但如果我遗漏了一个不必要的解决方案,我会洗耳恭听。
我不确定我能否用玩具数据证明这个问题(处理时间非常长,随着循环的进行而复杂化),但这里有一些伪代码:
### I have a function - called 'enumerator' - which involves several joins and window functions.
# I run this function on my base dataset, df0, and return df1
df1 = enumerator(df0, param1 = apple, param2 = banana)
# Check for some condition in df1, then count number of rows in the result
counter = df1 \
.filter(col('X') == some_condition) \
.count()
# If there are rows meeting this condition, start a while loop
while counter > 0:
print('Starting with counter: ', str(counter))
# Run the enumerator function on df1 again
df2 = enumerator(df1, param1= apple, param2 = banana)
# Check for the condition again, then continue the while loop if necessary
counter = df2 \
.filter(col('X') == some_condition) \
.count()
df1 = df2
# After the while loop finishes, I take the last resulting dataframe and I will do several more operations and analyses downstream
final_df = df2
枚举器函数的一个重要方面是 'look back' window 中的一个序列,因此在进行所有必要的更正之前可能需要运行几次。
在我心里,我知道这很丑陋,但是 windowing/ranking/sequential 函数内的分析很关键。我的理解是,随着循环的继续,底层的 Spark 查询计划会变得越来越复杂。在这种情况下,我应该采用什么最佳实践吗?我是否应该在任何时候进行缓存 - 在 while 循环开始之前,还是在循环本身内?
你绝对应该 cache/persist 数据帧,否则 while
循环中的每次迭代都将从 df0
开始。此外,您可能希望取消保留使用过的数据帧以释放 disk/memory space.
还有一点要优化的是不要做一个count
,而是使用更便宜的操作,比如df.take(1)
。如果 returns 什么都没有,那么 counter == 0
.
df1 = enumerator(df0, param1 = apple, param2 = banana)
df1.cache()
# Check for some condition in df1, then count number of rows in the result
counter = len(df1.filter(col('X') == some_condition).take(1))
while counter > 0:
print('Starting with counter: ', str(counter))
df2 = enumerator(df1, param1 = apple, param2 = banana)
df2.cache()
counter = len(df2.filter(col('X') == some_condition).take(1))
df1.unpersist() # unpersist df1 as it will be overwritten
df1 = df2
final_df = df2