加入两个数据帧后最终数据帧上的 PySpark 条件格式

PySpark Conditional formatting on final DataFrame after joining two dataframes

PySpark DataFrame 场景:

  1. 有一个名为 DF 的 DataFrame。 DF 的两个主要列是 IDDate.
  2. 每个 ID 平均有 40 多个独特的 Date(非连续日期)。
  3. 现在,有第二个名为 DF_date 的 DataFrame,它有一个名为 Date 的列。 Dates 中的日期介于 DF.
  4. 的 'Date' 的最大值和最小值之间
  5. 现在,目标是用每个唯一 'ID' 的连续开始和结束日期填充 DF(缺失的停产日期用 left join 填充 DF_dateDF.

DF

+-------------+-------------+----------------+
|         Date|          Val|              ID|
+-------------+-------------+----------------+
|   2021-07-01|     81119.73|         Ax3838J|
|   2021-07-04|     81289.62|         Ax3838J|
|   2021-07-05|     81385.62|         Ax3838J|
|   2021-07-02|     81249.76|         Bz3838J|
|   2021-07-05|     81324.28|         Bz3838J|
|   2021-07-06|     81329.28|         Bz3838J|
+-------------+-------------+----------------+ 

DF_date

+-------------+
|         Date|
+-------------+
|   2021-07-01|
|   2021-07-02|
|   2021-07-03|
|   2021-07-04|
|   2021-07-05|
|   2021-07-06|
+-------------+

预期最终输出:

+-------------+-------------+----------------+
|         Date|          Val|              ID|
+-------------+-------------+----------------+
|   2021-07-01|     81119.73|         Ax3838J|
|   2021-07-02|     81119.73|         Ax3838J|
|   2021-07-03|     81119.73|         Ax3838J|
|   2021-07-04|     81289.62|         Ax3838J|
|   2021-07-05|     81385.62|         Ax3838J|
|   2021-07-02|     81249.76|         Bz3838J|
|   2021-07-03|     81249.76|         Bz3838J|
|   2021-07-04|     81249.76|         Bz3838J|
|   2021-07-05|     81324.28|         Bz3838J|
|   2021-07-06|     81329.28|         Bz3838J|
+-------------+-------------+----------------+ 

你的问题没有意义。为什么要有一个包含开始日期和结束日期的 DF_date 数据框,使用它们来填写日期,然后求助于使用 DF 开始日期和结束日期。为什么不使用每个组的 DF 最小和最大日期来填充缺失的日期。

无论如何,这就是您根据 DF_Date

填写缺失日期的方式

根据您的评论,查看我的编辑

  new = (DF.groupby('ID')
       .agg(to_date(first('Date')).alias('min_date')#minimum date per group
       ,to_date(last('Date')).alias('max_date')#max date per group
       ,*[collect_list(i).alias(f"{i}") for i in DF.drop('ID').columns])#Dates and Val into an array for each group
       
       #Explosion results into a new column 2 which ideally is the new date, Drop existing date and rename 2 to date
       .selectExpr("ID","inline(arrays_zip(Date,Val,sequence(min_date,max_date,interval 1 day)))")
       .drop('Date').withColumnRenamed('2','Date')
       #Forward fill the Val column
      .withColumn('Val', coalesce(last('val',True).over(Window.partitionBy('ID').orderBy('Date'))))
       
      ).show()

+-------+--------+----------+
|     ID|     Val|      Date|
+-------+--------+----------+
|Ax3838J|81119.73|2021-07-01|
|Ax3838J|81289.62|2021-07-02|
|Ax3838J|81385.62|2021-07-03|
|Ax3838J|81385.62|2021-07-04|
|Ax3838J|81385.62|2021-07-05|
|Bz3838J|81249.76|2021-07-02|
|Bz3838J|81324.28|2021-07-03|
|Bz3838J|81329.28|2021-07-04|
|Bz3838J|81329.28|2021-07-05|
|Bz3838J|81329.28|2021-07-06|
+-------+--------+----------+

在上面的问题中,我后来按照@wwnde的建议意识到没有必要为日期创建一个单独的DF。

下面提供的代码也能达到目的 -

# Partition the data based on the client and order by DATE
window_fn = Window.partitionBy("ID").orderBy('DATE')


# the ranges of dates between the DATE value in the current row and the following row 

next_date = F.coalesce(F.lead("DATE", 1).over(window_fn), F.col("DATE") + F.expr("interval 1 day"))

end_date_range = next_date - F.expr("interval 1 day")

# then using 'sequence' function to generate all intermediate dates
# exploded this array to fill in values for the missing dates.

final_result = DF.withColumn("Ranges", F.sequence(F.col("DATE"), end_date_range, F.expr("interval 1 day")))\
  .withColumn("DATE", F.explode("Ranges"))\
  .withColumn("DATE", F.to_timestamp("date", 'yyyy-MM-dd'))\
  .drop("Ranges")

display(final_result)