在 Pyspark 中为组连接函数持久化循环数据帧

Persisting loop dataframes for group concat functions in Pyspark

我正在尝试将 spark 数据框聚合到一个唯一 ID,从给定排序列的该 ID 的该列中选择第一个非空值。基本上复制 MySQL 的 group_concat 功能。

此处 的 SO post 对于复制单个列的 group_concat 非常有帮助。我需要为动态列列表执行此操作。

我宁愿不必为每一列复制此代码(十多个,将来可能是动态的),所以我试图在一个循环中实现(我知道在 spark 中皱眉!)给定一个列表列名称。循环成功运行,但即使中间 df 为 cached/persisted(回复:),先前的迭代也不会持续存在。

任何帮助、指示或更优雅的非循环解决方案将不胜感激(如果有更合适的函数式编程方法,请不要害怕尝试一点 Scala)!

给出以下 df:

unique_id row_id first_name last_name middle_name score
1000000 1000002 Simmons Bonnie Darnell 88
1000000 1000006 Dowell Crawford Anne 87
1000000 1000007 NULL Eric Victor 89
1000000 1000000 Zachary Fields Narik 86
1000000 1000003 NULL NULL Warren 92
1000000 1000008 Paulette Ronald Irvin 85
group_column = "unique_id"
concat_list = ['first_name','last_name','middle_name']
sort_column = "score"
sort_order = False
df_final=df.select(group_column).distinct()
for i in concat_list:\
  df_helper=df
  df_helper=df_helper.groupBy(group_column)\
  .agg(sort_array(collect_list(struct(sort_column,i)),sort_order).alias('collect_list'))\
  .withColumn("sorted_list",col("collect_list."+str(i)))\
  .withColumn("first_item",slice(col("sorted_list"),1,1))\
  .withColumn(i,concat_ws(",",col("first_item")))\
  .drop("collect_list")\
  .drop("sorted_list")\
  .drop("first_item")
  print(i)
  df_final=df_final.join(df_helper,group_column,"inner")
  df_final.cache()
df_final.display() #I'm using databricks

我的结果如下:

unique_id middle_name
1000000 Warren

我想要的结果是:

unique_id first_name last_name middle_name
1000000 Simmons Eric Warren

Second set of tables if they don't pretty print above

我找到了解决我自己问题的方法:在我加入数据框时在我的数据框上添加一个 .collect() 调用,而不是 persist()cache();这将产生预期的数据帧。

group_column = "unique_id"
enter code hereconcat_list = ['first_name','last_name','middle_name']
sort_column = "score"
sort_order = False
df_final=df.select(group_column).distinct()
for i in concat_list:\
  df_helper=df
  df_helper=df_helper.groupBy(group_column)\
  .agg(sort_array(collect_list(struct(sort_column,i)),sort_order).alias('collect_list'))\
  .withColumn("sorted_list",col("collect_list."+str(i)))\
  .withColumn("first_item",slice(col("sorted_list"),1,1))\
  .withColumn(i,concat_ws(",",col("first_item")))\
  .drop("collect_list")\
  .drop("sorted_list")\
  .drop("first_item")
  print(i)
  df_final=df_final.join(df_helper,group_column,"inner")
  df_final.collect()
df_final.display() #I'm using databricks