为缺少的日期扩展 PySpark 数据框
Expand PySpark dataframe for missing dates
我有一个 PySpark 数据框,其中有几个 (key1, key2, key3, date)
集的度量。即:
+-----+-----+-----+----------+-----+-----+
| key1| key2| key3| date|val_1|val_2|
+-----+-----+-----+----------+-----+-----+
|pk1 |name1| VA|2022-03-06| 0| 3|
|pk1 |name1| VA|2022-03-07| 2| 4|
|pk1 |name1| VA|2022-03-09| 3| 4|
|pk2 |name2| NC|2022-03-06| 4| 1|
|pk2 |name2| NC|2022-03-08| 2| 6|
|pk2 |name2| NC|2022-03-09| 1| 4|
+-----+-----+-----+----------+-----+-----+
这个 table 有大约 5600 (key1, key2, key3)
个独特的元组。我想填写日期,使所有关键元组成为 1 天的序列。结果 table 应该是:
+-----+-----+-----+----------+-----+-----+
| key1| key2| key3| date|val_1|val_2|
+-----+-----+-----+----------+-----+-----+
|pk1 |name1| VA|2022-03-06| 0| 3|
|pk1 |name1| VA|2022-03-07| 2| 4|
|pk1 |name1| VA|2022-03-08| NA| NA|
|pk1 |name1| VA|2022-03-09| 3| 4|
|pk2 |name2| NC|2022-03-06| 4| 1|
|pk2 |name2| NC|2022-03-07| NA| NA|
|pk2 |name2| NC|2022-03-08| 2| 6|
|pk2 |name2| NC|2022-03-09| 1| 4|
+-----+-----+-----+----------+-----+-----+
这是我试过的方法:
from pyspark.sql import functions as F
minDate = df.select(F.min("date")).first()["min(date)"]
maxDate = df.select(F.max("date")).first()["max(date)"]
dateList = ",".join([str(maxDate - datetime.timedelta(days=x)) for x in range((maxDate - minDate).days + 1)])
df = df.select("key1", "key2", "key3", F.explode(F.split(dateList, ",")).alias("date"))
我从这个 SO 答案中提取了这个解决方案:。我的计划是构建这个“完整的”(key1, key2, key3, date)
数据框,然后将其与原始数据框连接起来。我得到的错误是:
You're referencing the column `2022-03-20,2022-03-19,2022-03-18,2022-03-17,2022-03-16,2022-03-15,2022-03-14,2022-03-13,2022-03-12,2022-03-11,2022-03-10,2022-03-09,2022-03-08,2022-03-07,2022-03-06`, but it is missing from the schema.
应该是:
df = df.select("key1", "key2", "key3", F.explode(F.split(F.lit(dateList), ",")).alias("date"))
使用lit()
创建文字值的列。顺便说一下,你应该先删除 "key1", "key2", "key3"
的重复项。
df=df.withColumn('date', to_date('date'))#format date if string
new = (df.groupby('key1','key2','key3').agg(expr('max(date)').alias('max_date'),expr('min(date)').alias('min_date'))#Compute max and min date for use in generating date range
.withColumn('date',expr("explode(sequence(min_date,max_date,interval 1 day))"))#Use sequence to compute range
.drop('max_date','min_date')#drop unwanted columns
)
#Join new df back to df
df.join(new, how='right', on=['key1', 'key2', 'key3', 'date']).show()
+----+-----+----+----------+-----+-----+
|key1| key2|key3| date|val_1|val_2|
+----+-----+----+----------+-----+-----+
| pk1|name1| VA|2022-03-06| 0| 3|
| pk1|name1| VA|2022-03-07| 2| 4|
| pk1|name1| VA|2022-03-08| null| null|
| pk1|name1| VA|2022-03-09| 3| 4|
| pk2|name2| NC|2022-03-06| 4| 1|
| pk2|name2| NC|2022-03-07| null| null|
| pk2|name2| NC|2022-03-08| 2| 6|
| pk2|name2| NC|2022-03-09| 1| 4|
+----+-----+----+----------+-----+-----+
我已经接受了 wwnde 的回答,但我认为在使用他们的回答之前我已经post 能够成功实施。
minDate = df.select(F.min("date")).first()["min(date)"]
maxDate = df.select(F.max("date")).first()["max(date)"]
dateList = [maxDate - datetime.timedelta(days=x) for x in range((maxDate - minDate).days + 1)]
fullDateDf = (
df
.select(["key1", "key2", "key3"])
.dropDuplicates()
.withColumn("date", F.array([F.lit(x) for x in dateList]))
)
fullDateDf = fullDateDf.select(
[
"key1",
"key2",
"key3",
F.explode(F.col("date")).alias("date")
]
)
df = (
df
.join(
fullDateDf,
on=["key1", "key2", "key3", "date"],
how="outer"
)
)
我认为 wwnde 的回答更简洁,但我想我会分享另一种方法。
我有一个 PySpark 数据框,其中有几个 (key1, key2, key3, date)
集的度量。即:
+-----+-----+-----+----------+-----+-----+
| key1| key2| key3| date|val_1|val_2|
+-----+-----+-----+----------+-----+-----+
|pk1 |name1| VA|2022-03-06| 0| 3|
|pk1 |name1| VA|2022-03-07| 2| 4|
|pk1 |name1| VA|2022-03-09| 3| 4|
|pk2 |name2| NC|2022-03-06| 4| 1|
|pk2 |name2| NC|2022-03-08| 2| 6|
|pk2 |name2| NC|2022-03-09| 1| 4|
+-----+-----+-----+----------+-----+-----+
这个 table 有大约 5600 (key1, key2, key3)
个独特的元组。我想填写日期,使所有关键元组成为 1 天的序列。结果 table 应该是:
+-----+-----+-----+----------+-----+-----+
| key1| key2| key3| date|val_1|val_2|
+-----+-----+-----+----------+-----+-----+
|pk1 |name1| VA|2022-03-06| 0| 3|
|pk1 |name1| VA|2022-03-07| 2| 4|
|pk1 |name1| VA|2022-03-08| NA| NA|
|pk1 |name1| VA|2022-03-09| 3| 4|
|pk2 |name2| NC|2022-03-06| 4| 1|
|pk2 |name2| NC|2022-03-07| NA| NA|
|pk2 |name2| NC|2022-03-08| 2| 6|
|pk2 |name2| NC|2022-03-09| 1| 4|
+-----+-----+-----+----------+-----+-----+
这是我试过的方法:
from pyspark.sql import functions as F
minDate = df.select(F.min("date")).first()["min(date)"]
maxDate = df.select(F.max("date")).first()["max(date)"]
dateList = ",".join([str(maxDate - datetime.timedelta(days=x)) for x in range((maxDate - minDate).days + 1)])
df = df.select("key1", "key2", "key3", F.explode(F.split(dateList, ",")).alias("date"))
我从这个 SO 答案中提取了这个解决方案:(key1, key2, key3, date)
数据框,然后将其与原始数据框连接起来。我得到的错误是:
You're referencing the column `2022-03-20,2022-03-19,2022-03-18,2022-03-17,2022-03-16,2022-03-15,2022-03-14,2022-03-13,2022-03-12,2022-03-11,2022-03-10,2022-03-09,2022-03-08,2022-03-07,2022-03-06`, but it is missing from the schema.
应该是:
df = df.select("key1", "key2", "key3", F.explode(F.split(F.lit(dateList), ",")).alias("date"))
使用lit()
创建文字值的列。顺便说一下,你应该先删除 "key1", "key2", "key3"
的重复项。
df=df.withColumn('date', to_date('date'))#format date if string
new = (df.groupby('key1','key2','key3').agg(expr('max(date)').alias('max_date'),expr('min(date)').alias('min_date'))#Compute max and min date for use in generating date range
.withColumn('date',expr("explode(sequence(min_date,max_date,interval 1 day))"))#Use sequence to compute range
.drop('max_date','min_date')#drop unwanted columns
)
#Join new df back to df
df.join(new, how='right', on=['key1', 'key2', 'key3', 'date']).show()
+----+-----+----+----------+-----+-----+
|key1| key2|key3| date|val_1|val_2|
+----+-----+----+----------+-----+-----+
| pk1|name1| VA|2022-03-06| 0| 3|
| pk1|name1| VA|2022-03-07| 2| 4|
| pk1|name1| VA|2022-03-08| null| null|
| pk1|name1| VA|2022-03-09| 3| 4|
| pk2|name2| NC|2022-03-06| 4| 1|
| pk2|name2| NC|2022-03-07| null| null|
| pk2|name2| NC|2022-03-08| 2| 6|
| pk2|name2| NC|2022-03-09| 1| 4|
+----+-----+----+----------+-----+-----+
我已经接受了 wwnde 的回答,但我认为在使用他们的回答之前我已经post 能够成功实施。
minDate = df.select(F.min("date")).first()["min(date)"]
maxDate = df.select(F.max("date")).first()["max(date)"]
dateList = [maxDate - datetime.timedelta(days=x) for x in range((maxDate - minDate).days + 1)]
fullDateDf = (
df
.select(["key1", "key2", "key3"])
.dropDuplicates()
.withColumn("date", F.array([F.lit(x) for x in dateList]))
)
fullDateDf = fullDateDf.select(
[
"key1",
"key2",
"key3",
F.explode(F.col("date")).alias("date")
]
)
df = (
df
.join(
fullDateDf,
on=["key1", "key2", "key3", "date"],
how="outer"
)
)
我认为 wwnde 的回答更简洁,但我想我会分享另一种方法。