用以前的值填充日历 table 中的行

Filling rows from calendar table with previous values

我是 SQL 的新手,来自 Python 和 R,并使用 Spark SQL 和 Databricks。我正在尝试完成一个基本查询,希望得到指导,尤其是解释 SQL 与我的问题相关的基本概念的指导。

我有一个包含完整连续日期的日历 table 和一个包含 date_addeduser_idsales 和 [= 的数据 table 17=] 列。数据 table 的日期不完整,因为并非每个用户在每个日期都处于活动状态。以下是每个 table.

的示例

日历Table

date
2020-01-01
2020-01-02
2020-01-03
2020-01-04
2020-01-05
2020-01-06

数据Table

date_added     user_id    sales    price
2020-01-02     01         1        4.00
2020-01-05     01         3        4.00
2020-01-02     02         1        5.00
2020-01-03     02         1        5.00
2020-01-05     02         2        5.00
2020-01-03     03         2        1.00
2020-01-05     03         5        1.00

我想创建一个新的 table,其中每个日历 table 特定范围内的日期(活动 日期)是为每个定义的用户,除销售列外的所有列的空值都由该列中的以下值填充。沿着这些线的东西:

date           user_id    sales    price         
2020-01-02     01         1        4.00
2020-01-03     01         null     4.00
2020-01-04     01         null     4.00
2020-01-05     01         3        4.00
2020-01-02     02         1        5.00
2020-01-03     02         1        5.00
2020-01-04     02         null     5.00
2020-01-05     02         2        5.00
2020-01-02     03         null     1.00
2020-01-03     03         2        1.00
2020-01-04     03         null     1.00
2020-01-05     03         5        1.00

如能提供有关我如何处理此输出的任何指导,我们将不胜感激。我尝试在日期上使用 LEFT JOIN,但没有成功。我知道 UNION 运算符用于将 table 一个接一个地串联起来,但不知道如何在此处应用该方法。

您可以使用日历交叉连接用户 table 然后使用数据左连接 table:

spark.sql("""
  SELECT  date, dates.user_id, sales, COALESCE(data.price, dates.price) AS price
  FROM    (
      SELECT  user_id, price, date
      FROM    (SELECT user_id, FIRST(price) as price FROM data_table GROUP BY user_id)
      CROSS JOIN calender_table
      WHERE   date >= (SELECT MIN(date_added) FROM data_table)
      AND     date <= (SELECT MAX(date_added) FROM data_table)
  )   dates
  LEFT JOIN data_table data
  ON      dates.user_id = data.user_id
  AND     dates.date = data.date_added
""").show()

输出:

+----------+-------+-----+-----+
|date      |user_id|sales|price|
+----------+-------+-----+-----+
|2020-01-02|01     |1    |4.0  |
|2020-01-03|01     |null |4.0  |
|2020-01-04|01     |null |4.0  |
|2020-01-05|01     |3    |4.0  |
|2020-01-02|02     |1    |5.0  |
|2020-01-03|02     |1    |5.0  |
|2020-01-04|02     |null |5.0  |
|2020-01-05|02     |2    |5.0  |
|2020-01-02|03     |null |1.0  |
|2020-01-03|03     |2    |1.0  |
|2020-01-04|03     |null |1.0  |
|2020-01-05|03     |5    |1.0  |
+----------+-------+-----+-----+

您也可以在不使用日历的情况下生成日期 table 使用 sequence function. See my other answer here

让你的原始数据框为df1。然后你可以得到每个 idmin, max 日期,并将其作为 `df2'.

from pyspark.sql import functions as f
from pyspark.sql import Window

w = Window.partitionBy('user_id').orderBy(f.desc('date_added'))

df2 = df1.groupBy('user_id') \
  .agg(f.sequence(f.min('date_added'), f.max('date_added')).alias('date_added')) \
  .withColumn('date_added', f.explode('date_added'))

df2.join(df, ['user_id', 'date_added'], 'left') \
   .withColumn('price', f.first('price').over(w)) \
   .orderBy('user_id', 'date_added') \
   .show()

+-------+----------+-----+-----+
|user_id|date_added|sales|price|
+-------+----------+-----+-----+
|      1|2020-01-02|    1|  4.0|
|      1|2020-01-03| null|  4.0|
|      1|2020-01-04| null|  4.0|
|      1|2020-01-05|    3|  4.0|
|      2|2020-01-02|    1|  5.0|
|      2|2020-01-03|    1|  5.0|
|      2|2020-01-04| null|  5.0|
|      2|2020-01-05|    2|  5.0|
|      3|2020-01-03|    2|  1.0|
|      3|2020-01-04| null|  1.0|
|      3|2020-01-05|    5|  1.0|
+-------+----------+-----+-----+