即使在使用 collect_set 开窗后,列表也是无序的
List is unordered even after using windowing with collect_set
所以我正在尝试 collect_set 来自数据框的一组日期。我面临的问题是日期不按照数据框的顺序出现。
Example dataframe (this is a much larger dataset. Basically this dataframe tracks the
beginning date of a week, for every single day in a year)
+--------+-----------+----------+
|year_num|week_beg_dt| cal_dt|
+--------+-----------+----------+
| 2013| 2012-12-31|2012-12-31|
| 2013| 2012-12-31|2013-01-03|
| 2013| 2013-01-07|2013-01-07|
| 2013| 2013-01-07|2013-01-12|
| 2013| 2013-01-14|2013-01-14|
| 2013| 2013-01-14|2013-01-15|
| 2014| 2014-01-01|2014-01-01|
| 2014| 2014-01-01|2014-01-05|
| 2014| 2014-01-07|2014-01-07|
| 2014| 2014-01-07|2014-01-12|
| 2014| 2014-01-15|2014-01-15|
| 2014| 2014-01-15|2014-01-16|
What Im trying to get to is this
+--------+-------------------------------------+
|year_num| dates. |
+--------+-------------------------------------+
| 2013|[2012-12-31, 2013-01-07, 2013-01-14] |
| 2014|[2014-01-01, 2014-01-07, 2014-01-14] |
我已经尝试使用窗口来完成它,因为 collect_set 与 groupBy 一起将导致无序集:
from pyspark.sql import functions as F
from pyspark.sql import Window
w = Window.partitionBy('year_num').orderBy('week_beg_dt')
business_days_ = df2.withColumn('dates', F.collect_set('week_beg_dt').over(w)) \
.groupBy('year_num') \
.agg(F.max('dates').alias('dates')) \
.collect()
但我仍然得到无序集。对我做错了什么以及如何解决有什么建议吗?
对于 Spark 2.4+,在 collect_set
的内置函数中使用 array_sort
来获取有序列表。
Example:
df1.show()
#+--------+-----------+----------+
#|year_num|week_beg_dt| cal_dt|
#+--------+-----------+----------+
#| 2013| 2012-12-31|2012-12-31|
#| 2013| 2012-12-31|2012-12-31|
#| 2013| 2013-01-07|2013-01-03|
#+--------+-----------+----------+
#without array_sort
df1.groupBy("year_num").agg(collect_set(col("week_beg_dt"))).show(10,False)
#+--------+------------------------+
#|year_num|collect_set(week_beg_dt)|
#+--------+------------------------+
#|2013 |[2013-01-07, 2012-12-31]|
#+--------+------------------------+
#using array_sort
df1.groupBy("year_num").agg(array_sort(collect_set(col("week_beg_dt")))).show(10,False)
#+--------+------------------------------------+
#|year_num|array_sort(collect_set(week_beg_dt))|
#+--------+------------------------------------+
#|2013 |[2012-12-31, 2013-01-07] |
#+--------+------------------------------------+
对于早期版本的 Spark:
from pyspark.sql.types import *
#udf to sort array
sort_arr_udf=udf(lambda x:sorted(x),ArrayType(StringType()))
df1.groupBy("year_num").agg(sort_arr_udf(collect_set(col("week_beg_dt")))).show(10,False)
#+--------+----------------------------------------+
#|year_num|<lambda>(collect_set(week_beg_dt, 0, 0))|
#+--------+----------------------------------------+
#|2013 |[2012-12-31, 2013-01-07] |
#+--------+----------------------------------------+
所以我正在尝试 collect_set 来自数据框的一组日期。我面临的问题是日期不按照数据框的顺序出现。
Example dataframe (this is a much larger dataset. Basically this dataframe tracks the
beginning date of a week, for every single day in a year)
+--------+-----------+----------+
|year_num|week_beg_dt| cal_dt|
+--------+-----------+----------+
| 2013| 2012-12-31|2012-12-31|
| 2013| 2012-12-31|2013-01-03|
| 2013| 2013-01-07|2013-01-07|
| 2013| 2013-01-07|2013-01-12|
| 2013| 2013-01-14|2013-01-14|
| 2013| 2013-01-14|2013-01-15|
| 2014| 2014-01-01|2014-01-01|
| 2014| 2014-01-01|2014-01-05|
| 2014| 2014-01-07|2014-01-07|
| 2014| 2014-01-07|2014-01-12|
| 2014| 2014-01-15|2014-01-15|
| 2014| 2014-01-15|2014-01-16|
What Im trying to get to is this
+--------+-------------------------------------+
|year_num| dates. |
+--------+-------------------------------------+
| 2013|[2012-12-31, 2013-01-07, 2013-01-14] |
| 2014|[2014-01-01, 2014-01-07, 2014-01-14] |
我已经尝试使用窗口来完成它,因为 collect_set 与 groupBy 一起将导致无序集:
from pyspark.sql import functions as F
from pyspark.sql import Window
w = Window.partitionBy('year_num').orderBy('week_beg_dt')
business_days_ = df2.withColumn('dates', F.collect_set('week_beg_dt').over(w)) \
.groupBy('year_num') \
.agg(F.max('dates').alias('dates')) \
.collect()
但我仍然得到无序集。对我做错了什么以及如何解决有什么建议吗?
对于 Spark 2.4+,在 collect_set
的内置函数中使用 array_sort
来获取有序列表。
Example:
df1.show()
#+--------+-----------+----------+
#|year_num|week_beg_dt| cal_dt|
#+--------+-----------+----------+
#| 2013| 2012-12-31|2012-12-31|
#| 2013| 2012-12-31|2012-12-31|
#| 2013| 2013-01-07|2013-01-03|
#+--------+-----------+----------+
#without array_sort
df1.groupBy("year_num").agg(collect_set(col("week_beg_dt"))).show(10,False)
#+--------+------------------------+
#|year_num|collect_set(week_beg_dt)|
#+--------+------------------------+
#|2013 |[2013-01-07, 2012-12-31]|
#+--------+------------------------+
#using array_sort
df1.groupBy("year_num").agg(array_sort(collect_set(col("week_beg_dt")))).show(10,False)
#+--------+------------------------------------+
#|year_num|array_sort(collect_set(week_beg_dt))|
#+--------+------------------------------------+
#|2013 |[2012-12-31, 2013-01-07] |
#+--------+------------------------------------+
对于早期版本的 Spark:
from pyspark.sql.types import *
#udf to sort array
sort_arr_udf=udf(lambda x:sorted(x),ArrayType(StringType()))
df1.groupBy("year_num").agg(sort_arr_udf(collect_set(col("week_beg_dt")))).show(10,False)
#+--------+----------------------------------------+
#|year_num|<lambda>(collect_set(week_beg_dt, 0, 0))|
#+--------+----------------------------------------+
#|2013 |[2012-12-31, 2013-01-07] |
#+--------+----------------------------------------+