在 Pyspark 中为组连接函数持久化循环数据帧
Persisting loop dataframes for group concat functions in Pyspark
我正在尝试将 spark 数据框聚合到一个唯一 ID,从给定排序列的该 ID 的该列中选择第一个非空值。基本上复制 MySQL 的 group_concat 功能。
此处 的 SO post 对于复制单个列的 group_concat 非常有帮助。我需要为动态列列表执行此操作。
我宁愿不必为每一列复制此代码(十多个,将来可能是动态的),所以我试图在一个循环中实现(我知道在 spark 中皱眉!)给定一个列表列名称。循环成功运行,但即使中间 df 为 cached/persisted(回复:),先前的迭代也不会持续存在。
任何帮助、指示或更优雅的非循环解决方案将不胜感激(如果有更合适的函数式编程方法,请不要害怕尝试一点 Scala)!
给出以下 df:
unique_id
row_id
first_name
last_name
middle_name
score
1000000
1000002
Simmons
Bonnie
Darnell
88
1000000
1000006
Dowell
Crawford
Anne
87
1000000
1000007
NULL
Eric
Victor
89
1000000
1000000
Zachary
Fields
Narik
86
1000000
1000003
NULL
NULL
Warren
92
1000000
1000008
Paulette
Ronald
Irvin
85
group_column = "unique_id"
concat_list = ['first_name','last_name','middle_name']
sort_column = "score"
sort_order = False
df_final=df.select(group_column).distinct()
for i in concat_list:\
df_helper=df
df_helper=df_helper.groupBy(group_column)\
.agg(sort_array(collect_list(struct(sort_column,i)),sort_order).alias('collect_list'))\
.withColumn("sorted_list",col("collect_list."+str(i)))\
.withColumn("first_item",slice(col("sorted_list"),1,1))\
.withColumn(i,concat_ws(",",col("first_item")))\
.drop("collect_list")\
.drop("sorted_list")\
.drop("first_item")
print(i)
df_final=df_final.join(df_helper,group_column,"inner")
df_final.cache()
df_final.display() #I'm using databricks
我的结果如下:
unique_id
middle_name
1000000
Warren
我想要的结果是:
unique_id
first_name
last_name
middle_name
1000000
Simmons
Eric
Warren
Second set of tables if they don't pretty print above
我找到了解决我自己问题的方法:在我加入数据框时在我的数据框上添加一个 .collect()
调用,而不是 persist()
或 cache()
;这将产生预期的数据帧。
group_column = "unique_id"
enter code hereconcat_list = ['first_name','last_name','middle_name']
sort_column = "score"
sort_order = False
df_final=df.select(group_column).distinct()
for i in concat_list:\
df_helper=df
df_helper=df_helper.groupBy(group_column)\
.agg(sort_array(collect_list(struct(sort_column,i)),sort_order).alias('collect_list'))\
.withColumn("sorted_list",col("collect_list."+str(i)))\
.withColumn("first_item",slice(col("sorted_list"),1,1))\
.withColumn(i,concat_ws(",",col("first_item")))\
.drop("collect_list")\
.drop("sorted_list")\
.drop("first_item")
print(i)
df_final=df_final.join(df_helper,group_column,"inner")
df_final.collect()
df_final.display() #I'm using databricks
我正在尝试将 spark 数据框聚合到一个唯一 ID,从给定排序列的该 ID 的该列中选择第一个非空值。基本上复制 MySQL 的 group_concat 功能。
此处
我宁愿不必为每一列复制此代码(十多个,将来可能是动态的),所以我试图在一个循环中实现(我知道在 spark 中皱眉!)给定一个列表列名称。循环成功运行,但即使中间 df 为 cached/persisted(回复:
任何帮助、指示或更优雅的非循环解决方案将不胜感激(如果有更合适的函数式编程方法,请不要害怕尝试一点 Scala)!
给出以下 df:
unique_id | row_id | first_name | last_name | middle_name | score |
---|---|---|---|---|---|
1000000 | 1000002 | Simmons | Bonnie | Darnell | 88 |
1000000 | 1000006 | Dowell | Crawford | Anne | 87 |
1000000 | 1000007 | NULL | Eric | Victor | 89 |
1000000 | 1000000 | Zachary | Fields | Narik | 86 |
1000000 | 1000003 | NULL | NULL | Warren | 92 |
1000000 | 1000008 | Paulette | Ronald | Irvin | 85 |
group_column = "unique_id"
concat_list = ['first_name','last_name','middle_name']
sort_column = "score"
sort_order = False
df_final=df.select(group_column).distinct()
for i in concat_list:\
df_helper=df
df_helper=df_helper.groupBy(group_column)\
.agg(sort_array(collect_list(struct(sort_column,i)),sort_order).alias('collect_list'))\
.withColumn("sorted_list",col("collect_list."+str(i)))\
.withColumn("first_item",slice(col("sorted_list"),1,1))\
.withColumn(i,concat_ws(",",col("first_item")))\
.drop("collect_list")\
.drop("sorted_list")\
.drop("first_item")
print(i)
df_final=df_final.join(df_helper,group_column,"inner")
df_final.cache()
df_final.display() #I'm using databricks
我的结果如下:
unique_id | middle_name |
---|---|
1000000 | Warren |
我想要的结果是:
unique_id | first_name | last_name | middle_name |
---|---|---|---|
1000000 | Simmons | Eric | Warren |
Second set of tables if they don't pretty print above
我找到了解决我自己问题的方法:在我加入数据框时在我的数据框上添加一个 .collect()
调用,而不是 persist()
或 cache()
;这将产生预期的数据帧。
group_column = "unique_id"
enter code hereconcat_list = ['first_name','last_name','middle_name']
sort_column = "score"
sort_order = False
df_final=df.select(group_column).distinct()
for i in concat_list:\
df_helper=df
df_helper=df_helper.groupBy(group_column)\
.agg(sort_array(collect_list(struct(sort_column,i)),sort_order).alias('collect_list'))\
.withColumn("sorted_list",col("collect_list."+str(i)))\
.withColumn("first_item",slice(col("sorted_list"),1,1))\
.withColumn(i,concat_ws(",",col("first_item")))\
.drop("collect_list")\
.drop("sorted_list")\
.drop("first_item")
print(i)
df_final=df_final.join(df_helper,group_column,"inner")
df_final.collect()
df_final.display() #I'm using databricks