用以前的值填充日历 table 中的行
Filling rows from calendar table with previous values
我是 SQL 的新手,来自 Python 和 R,并使用 Spark SQL 和 Databricks。我正在尝试完成一个基本查询,希望得到指导,尤其是解释 SQL 与我的问题相关的基本概念的指导。
我有一个包含完整连续日期的日历 table 和一个包含 date_added
、user_id
、sales
和 [= 的数据 table 17=] 列。数据 table 的日期不完整,因为并非每个用户在每个日期都处于活动状态。以下是每个 table.
的示例
日历Table
date
2020-01-01
2020-01-02
2020-01-03
2020-01-04
2020-01-05
2020-01-06
数据Table
date_added user_id sales price
2020-01-02 01 1 4.00
2020-01-05 01 3 4.00
2020-01-02 02 1 5.00
2020-01-03 02 1 5.00
2020-01-05 02 2 5.00
2020-01-03 03 2 1.00
2020-01-05 03 5 1.00
我想创建一个新的 table,其中每个日历 table 特定范围内的日期(活动 日期)是为每个定义的用户,除销售列外的所有列的空值都由该列中的以下值填充。沿着这些线的东西:
date user_id sales price
2020-01-02 01 1 4.00
2020-01-03 01 null 4.00
2020-01-04 01 null 4.00
2020-01-05 01 3 4.00
2020-01-02 02 1 5.00
2020-01-03 02 1 5.00
2020-01-04 02 null 5.00
2020-01-05 02 2 5.00
2020-01-02 03 null 1.00
2020-01-03 03 2 1.00
2020-01-04 03 null 1.00
2020-01-05 03 5 1.00
如能提供有关我如何处理此输出的任何指导,我们将不胜感激。我尝试在日期上使用 LEFT JOIN,但没有成功。我知道 UNION 运算符用于将 table 一个接一个地串联起来,但不知道如何在此处应用该方法。
您可以使用日历交叉连接用户 table 然后使用数据左连接 table:
spark.sql("""
SELECT date, dates.user_id, sales, COALESCE(data.price, dates.price) AS price
FROM (
SELECT user_id, price, date
FROM (SELECT user_id, FIRST(price) as price FROM data_table GROUP BY user_id)
CROSS JOIN calender_table
WHERE date >= (SELECT MIN(date_added) FROM data_table)
AND date <= (SELECT MAX(date_added) FROM data_table)
) dates
LEFT JOIN data_table data
ON dates.user_id = data.user_id
AND dates.date = data.date_added
""").show()
输出:
+----------+-------+-----+-----+
|date |user_id|sales|price|
+----------+-------+-----+-----+
|2020-01-02|01 |1 |4.0 |
|2020-01-03|01 |null |4.0 |
|2020-01-04|01 |null |4.0 |
|2020-01-05|01 |3 |4.0 |
|2020-01-02|02 |1 |5.0 |
|2020-01-03|02 |1 |5.0 |
|2020-01-04|02 |null |5.0 |
|2020-01-05|02 |2 |5.0 |
|2020-01-02|03 |null |1.0 |
|2020-01-03|03 |2 |1.0 |
|2020-01-04|03 |null |1.0 |
|2020-01-05|03 |5 |1.0 |
+----------+-------+-----+-----+
您也可以在不使用日历的情况下生成日期 table 使用 sequence
function. See my other answer here。
让你的原始数据框为df1
。然后你可以得到每个 id
的 min
, max
日期,并将其作为 `df2'.
from pyspark.sql import functions as f
from pyspark.sql import Window
w = Window.partitionBy('user_id').orderBy(f.desc('date_added'))
df2 = df1.groupBy('user_id') \
.agg(f.sequence(f.min('date_added'), f.max('date_added')).alias('date_added')) \
.withColumn('date_added', f.explode('date_added'))
df2.join(df, ['user_id', 'date_added'], 'left') \
.withColumn('price', f.first('price').over(w)) \
.orderBy('user_id', 'date_added') \
.show()
+-------+----------+-----+-----+
|user_id|date_added|sales|price|
+-------+----------+-----+-----+
| 1|2020-01-02| 1| 4.0|
| 1|2020-01-03| null| 4.0|
| 1|2020-01-04| null| 4.0|
| 1|2020-01-05| 3| 4.0|
| 2|2020-01-02| 1| 5.0|
| 2|2020-01-03| 1| 5.0|
| 2|2020-01-04| null| 5.0|
| 2|2020-01-05| 2| 5.0|
| 3|2020-01-03| 2| 1.0|
| 3|2020-01-04| null| 1.0|
| 3|2020-01-05| 5| 1.0|
+-------+----------+-----+-----+
我是 SQL 的新手,来自 Python 和 R,并使用 Spark SQL 和 Databricks。我正在尝试完成一个基本查询,希望得到指导,尤其是解释 SQL 与我的问题相关的基本概念的指导。
我有一个包含完整连续日期的日历 table 和一个包含 date_added
、user_id
、sales
和 [= 的数据 table 17=] 列。数据 table 的日期不完整,因为并非每个用户在每个日期都处于活动状态。以下是每个 table.
日历Table
date
2020-01-01
2020-01-02
2020-01-03
2020-01-04
2020-01-05
2020-01-06
数据Table
date_added user_id sales price
2020-01-02 01 1 4.00
2020-01-05 01 3 4.00
2020-01-02 02 1 5.00
2020-01-03 02 1 5.00
2020-01-05 02 2 5.00
2020-01-03 03 2 1.00
2020-01-05 03 5 1.00
我想创建一个新的 table,其中每个日历 table 特定范围内的日期(活动 日期)是为每个定义的用户,除销售列外的所有列的空值都由该列中的以下值填充。沿着这些线的东西:
date user_id sales price
2020-01-02 01 1 4.00
2020-01-03 01 null 4.00
2020-01-04 01 null 4.00
2020-01-05 01 3 4.00
2020-01-02 02 1 5.00
2020-01-03 02 1 5.00
2020-01-04 02 null 5.00
2020-01-05 02 2 5.00
2020-01-02 03 null 1.00
2020-01-03 03 2 1.00
2020-01-04 03 null 1.00
2020-01-05 03 5 1.00
如能提供有关我如何处理此输出的任何指导,我们将不胜感激。我尝试在日期上使用 LEFT JOIN,但没有成功。我知道 UNION 运算符用于将 table 一个接一个地串联起来,但不知道如何在此处应用该方法。
您可以使用日历交叉连接用户 table 然后使用数据左连接 table:
spark.sql("""
SELECT date, dates.user_id, sales, COALESCE(data.price, dates.price) AS price
FROM (
SELECT user_id, price, date
FROM (SELECT user_id, FIRST(price) as price FROM data_table GROUP BY user_id)
CROSS JOIN calender_table
WHERE date >= (SELECT MIN(date_added) FROM data_table)
AND date <= (SELECT MAX(date_added) FROM data_table)
) dates
LEFT JOIN data_table data
ON dates.user_id = data.user_id
AND dates.date = data.date_added
""").show()
输出:
+----------+-------+-----+-----+
|date |user_id|sales|price|
+----------+-------+-----+-----+
|2020-01-02|01 |1 |4.0 |
|2020-01-03|01 |null |4.0 |
|2020-01-04|01 |null |4.0 |
|2020-01-05|01 |3 |4.0 |
|2020-01-02|02 |1 |5.0 |
|2020-01-03|02 |1 |5.0 |
|2020-01-04|02 |null |5.0 |
|2020-01-05|02 |2 |5.0 |
|2020-01-02|03 |null |1.0 |
|2020-01-03|03 |2 |1.0 |
|2020-01-04|03 |null |1.0 |
|2020-01-05|03 |5 |1.0 |
+----------+-------+-----+-----+
您也可以在不使用日历的情况下生成日期 table 使用 sequence
function. See my other answer here。
让你的原始数据框为df1
。然后你可以得到每个 id
的 min
, max
日期,并将其作为 `df2'.
from pyspark.sql import functions as f
from pyspark.sql import Window
w = Window.partitionBy('user_id').orderBy(f.desc('date_added'))
df2 = df1.groupBy('user_id') \
.agg(f.sequence(f.min('date_added'), f.max('date_added')).alias('date_added')) \
.withColumn('date_added', f.explode('date_added'))
df2.join(df, ['user_id', 'date_added'], 'left') \
.withColumn('price', f.first('price').over(w)) \
.orderBy('user_id', 'date_added') \
.show()
+-------+----------+-----+-----+
|user_id|date_added|sales|price|
+-------+----------+-----+-----+
| 1|2020-01-02| 1| 4.0|
| 1|2020-01-03| null| 4.0|
| 1|2020-01-04| null| 4.0|
| 1|2020-01-05| 3| 4.0|
| 2|2020-01-02| 1| 5.0|
| 2|2020-01-03| 1| 5.0|
| 2|2020-01-04| null| 5.0|
| 2|2020-01-05| 2| 5.0|
| 3|2020-01-03| 2| 1.0|
| 3|2020-01-04| null| 1.0|
| 3|2020-01-05| 5| 1.0|
+-------+----------+-----+-----+