加入两个数据帧后最终数据帧上的 PySpark 条件格式
PySpark Conditional formatting on final DataFrame after joining two dataframes
PySpark DataFrame 场景:
- 有一个名为
DF
的 DataFrame。 DF
的两个主要列是 ID
和 Date
.
- 每个
ID
平均有 40 多个独特的 Date
(非连续日期)。
- 现在,有第二个名为
DF_date
的 DataFrame,它有一个名为 Date
的列。 Dates
中的日期介于 DF
. 的 'Date' 的最大值和最小值之间
- 现在,目标是用每个唯一 'ID' 的连续开始和结束日期填充
DF
(缺失的停产日期用 left join
填充 DF_date
和 DF
.
DF
+-------------+-------------+----------------+
| Date| Val| ID|
+-------------+-------------+----------------+
| 2021-07-01| 81119.73| Ax3838J|
| 2021-07-04| 81289.62| Ax3838J|
| 2021-07-05| 81385.62| Ax3838J|
| 2021-07-02| 81249.76| Bz3838J|
| 2021-07-05| 81324.28| Bz3838J|
| 2021-07-06| 81329.28| Bz3838J|
+-------------+-------------+----------------+
DF_date
+-------------+
| Date|
+-------------+
| 2021-07-01|
| 2021-07-02|
| 2021-07-03|
| 2021-07-04|
| 2021-07-05|
| 2021-07-06|
+-------------+
预期最终输出:
+-------------+-------------+----------------+
| Date| Val| ID|
+-------------+-------------+----------------+
| 2021-07-01| 81119.73| Ax3838J|
| 2021-07-02| 81119.73| Ax3838J|
| 2021-07-03| 81119.73| Ax3838J|
| 2021-07-04| 81289.62| Ax3838J|
| 2021-07-05| 81385.62| Ax3838J|
| 2021-07-02| 81249.76| Bz3838J|
| 2021-07-03| 81249.76| Bz3838J|
| 2021-07-04| 81249.76| Bz3838J|
| 2021-07-05| 81324.28| Bz3838J|
| 2021-07-06| 81329.28| Bz3838J|
+-------------+-------------+----------------+
你的问题没有意义。为什么要有一个包含开始日期和结束日期的 DF_date
数据框,使用它们来填写日期,然后求助于使用 DF
开始日期和结束日期。为什么不使用每个组的 DF
最小和最大日期来填充缺失的日期。
无论如何,这就是您根据 DF_Date
填写缺失日期的方式
根据您的评论,查看我的编辑
new = (DF.groupby('ID')
.agg(to_date(first('Date')).alias('min_date')#minimum date per group
,to_date(last('Date')).alias('max_date')#max date per group
,*[collect_list(i).alias(f"{i}") for i in DF.drop('ID').columns])#Dates and Val into an array for each group
#Explosion results into a new column 2 which ideally is the new date, Drop existing date and rename 2 to date
.selectExpr("ID","inline(arrays_zip(Date,Val,sequence(min_date,max_date,interval 1 day)))")
.drop('Date').withColumnRenamed('2','Date')
#Forward fill the Val column
.withColumn('Val', coalesce(last('val',True).over(Window.partitionBy('ID').orderBy('Date'))))
).show()
+-------+--------+----------+
| ID| Val| Date|
+-------+--------+----------+
|Ax3838J|81119.73|2021-07-01|
|Ax3838J|81289.62|2021-07-02|
|Ax3838J|81385.62|2021-07-03|
|Ax3838J|81385.62|2021-07-04|
|Ax3838J|81385.62|2021-07-05|
|Bz3838J|81249.76|2021-07-02|
|Bz3838J|81324.28|2021-07-03|
|Bz3838J|81329.28|2021-07-04|
|Bz3838J|81329.28|2021-07-05|
|Bz3838J|81329.28|2021-07-06|
+-------+--------+----------+
在上面的问题中,我后来按照@wwnde的建议意识到没有必要为日期创建一个单独的DF。
下面提供的代码也能达到目的 -
# Partition the data based on the client and order by DATE
window_fn = Window.partitionBy("ID").orderBy('DATE')
# the ranges of dates between the DATE value in the current row and the following row
next_date = F.coalesce(F.lead("DATE", 1).over(window_fn), F.col("DATE") + F.expr("interval 1 day"))
end_date_range = next_date - F.expr("interval 1 day")
# then using 'sequence' function to generate all intermediate dates
# exploded this array to fill in values for the missing dates.
final_result = DF.withColumn("Ranges", F.sequence(F.col("DATE"), end_date_range, F.expr("interval 1 day")))\
.withColumn("DATE", F.explode("Ranges"))\
.withColumn("DATE", F.to_timestamp("date", 'yyyy-MM-dd'))\
.drop("Ranges")
display(final_result)
PySpark DataFrame 场景:
- 有一个名为
DF
的 DataFrame。DF
的两个主要列是ID
和Date
. - 每个
ID
平均有 40 多个独特的Date
(非连续日期)。 - 现在,有第二个名为
DF_date
的 DataFrame,它有一个名为Date
的列。Dates
中的日期介于DF
. 的 'Date' 的最大值和最小值之间
- 现在,目标是用每个唯一 'ID' 的连续开始和结束日期填充
DF
(缺失的停产日期用left join
填充DF_date
和DF
.
DF
+-------------+-------------+----------------+
| Date| Val| ID|
+-------------+-------------+----------------+
| 2021-07-01| 81119.73| Ax3838J|
| 2021-07-04| 81289.62| Ax3838J|
| 2021-07-05| 81385.62| Ax3838J|
| 2021-07-02| 81249.76| Bz3838J|
| 2021-07-05| 81324.28| Bz3838J|
| 2021-07-06| 81329.28| Bz3838J|
+-------------+-------------+----------------+
DF_date
+-------------+
| Date|
+-------------+
| 2021-07-01|
| 2021-07-02|
| 2021-07-03|
| 2021-07-04|
| 2021-07-05|
| 2021-07-06|
+-------------+
预期最终输出:
+-------------+-------------+----------------+
| Date| Val| ID|
+-------------+-------------+----------------+
| 2021-07-01| 81119.73| Ax3838J|
| 2021-07-02| 81119.73| Ax3838J|
| 2021-07-03| 81119.73| Ax3838J|
| 2021-07-04| 81289.62| Ax3838J|
| 2021-07-05| 81385.62| Ax3838J|
| 2021-07-02| 81249.76| Bz3838J|
| 2021-07-03| 81249.76| Bz3838J|
| 2021-07-04| 81249.76| Bz3838J|
| 2021-07-05| 81324.28| Bz3838J|
| 2021-07-06| 81329.28| Bz3838J|
+-------------+-------------+----------------+
你的问题没有意义。为什么要有一个包含开始日期和结束日期的 DF_date
数据框,使用它们来填写日期,然后求助于使用 DF
开始日期和结束日期。为什么不使用每个组的 DF
最小和最大日期来填充缺失的日期。
无论如何,这就是您根据 DF_Date
根据您的评论,查看我的编辑
new = (DF.groupby('ID')
.agg(to_date(first('Date')).alias('min_date')#minimum date per group
,to_date(last('Date')).alias('max_date')#max date per group
,*[collect_list(i).alias(f"{i}") for i in DF.drop('ID').columns])#Dates and Val into an array for each group
#Explosion results into a new column 2 which ideally is the new date, Drop existing date and rename 2 to date
.selectExpr("ID","inline(arrays_zip(Date,Val,sequence(min_date,max_date,interval 1 day)))")
.drop('Date').withColumnRenamed('2','Date')
#Forward fill the Val column
.withColumn('Val', coalesce(last('val',True).over(Window.partitionBy('ID').orderBy('Date'))))
).show()
+-------+--------+----------+
| ID| Val| Date|
+-------+--------+----------+
|Ax3838J|81119.73|2021-07-01|
|Ax3838J|81289.62|2021-07-02|
|Ax3838J|81385.62|2021-07-03|
|Ax3838J|81385.62|2021-07-04|
|Ax3838J|81385.62|2021-07-05|
|Bz3838J|81249.76|2021-07-02|
|Bz3838J|81324.28|2021-07-03|
|Bz3838J|81329.28|2021-07-04|
|Bz3838J|81329.28|2021-07-05|
|Bz3838J|81329.28|2021-07-06|
+-------+--------+----------+
在上面的问题中,我后来按照@wwnde的建议意识到没有必要为日期创建一个单独的DF。
下面提供的代码也能达到目的 -
# Partition the data based on the client and order by DATE
window_fn = Window.partitionBy("ID").orderBy('DATE')
# the ranges of dates between the DATE value in the current row and the following row
next_date = F.coalesce(F.lead("DATE", 1).over(window_fn), F.col("DATE") + F.expr("interval 1 day"))
end_date_range = next_date - F.expr("interval 1 day")
# then using 'sequence' function to generate all intermediate dates
# exploded this array to fill in values for the missing dates.
final_result = DF.withColumn("Ranges", F.sequence(F.col("DATE"), end_date_range, F.expr("interval 1 day")))\
.withColumn("DATE", F.explode("Ranges"))\
.withColumn("DATE", F.to_timestamp("date", 'yyyy-MM-dd'))\
.drop("Ranges")
display(final_result)