缓存和循环(Pyspark

Cacheing and Loops in (Py)Spark

我知道在使用 Spark 时通常要避免 'for' 和 'while' 循环。我的问题是关于优化 'while' 循环,但如果我遗漏了一个不必要的解决方案,我会洗耳恭听。

我不确定我能否用玩具数据证明这个问题(处理时间非常长,随着循环的进行而复杂化),但这里有一些伪代码:

### I have a function - called 'enumerator' - which involves several joins and window functions. 
# I run this function on my base dataset, df0, and return df1
df1 = enumerator(df0, param1 = apple, param2 = banana)

# Check for some condition in df1, then count number of rows in the result
counter = df1 \
.filter(col('X') == some_condition) \
.count()

# If there are rows meeting this condition, start a while loop
while counter > 0:
  print('Starting with counter: ', str(counter))
  
  # Run the enumerator function on df1 again
  df2 = enumerator(df1, param1= apple, param2 = banana)
  
  # Check for the condition again, then continue the while loop if necessary
  counter = df2 \
  .filter(col('X') == some_condition) \
  .count()
  
  df1 = df2

# After the while loop finishes, I take the last resulting dataframe and I will do several more operations and analyses downstream  
final_df = df2

枚举器函数的一个重要方面是 'look back' window 中的一个序列,因此在进行所有必要的更正之前可能需要运行几次。

在我心里,我知道这很丑陋,但是 windowing/ranking/sequential 函数内的分析很关键。我的理解是,随着循环的继续,底层的 Spark 查询计划会变得越来越复杂。在这种情况下,我应该采用什么最佳实践吗?我是否应该在任何时候进行缓存 - 在 while 循环开始之前,还是在循环本身内?

你绝对应该 cache/persist 数据帧,否则 while 循环中的每次迭代都将从 df0 开始。此外,您可能希望取消保留使用过的数据帧以释放 disk/memory space.

还有一点要优化的是不要做一个count,而是使用更便宜的操作,比如df.take(1)。如果 returns 什么都没有,那么 counter == 0.

df1 = enumerator(df0, param1 = apple, param2 = banana)
df1.cache()

# Check for some condition in df1, then count number of rows in the result
counter = len(df1.filter(col('X') == some_condition).take(1))

while counter > 0:
  print('Starting with counter: ', str(counter))
  
  df2 = enumerator(df1, param1 = apple, param2 = banana)
  df2.cache()

  counter = len(df2.filter(col('X') == some_condition).take(1))
  df1.unpersist()    # unpersist df1 as it will be overwritten
  
  df1 = df2

final_df = df2