使用时间序列列创建 PySpark 数据框
Create PySpark dataframe with timeseries column
我有一个初始 PySpark 数据框,我想从中获取日期列中的 MIN
和 MAX
,然后使用时间序列(每日日期)创建一个新的 PySpark 数据框,使用来自我初始数据框的 MIN
和 MAX
。
然后我将使用它与我的初始数据框连接并查找缺失的日期(在我的初始 DF 的其余列中为空)。
我尝试了很多不同的方法来构建时间序列 DF,但它似乎在 PySpark 中不起作用。有什么建议吗?
可以像这样提取最大列的值:
df.groupBy().agg(F.max('col_name')).head()[0]
日期范围 df 可以这样创建:
df2 = spark.sql("SELECT sequence(to_date('2000-01-01'), to_date('2000-02-02'), interval 1 day) as date_col").withColumn('date_col', F.explode('date_col'))
然后join
。
完整示例:
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame([(1, '2022-04-01'),(2, '2022-04-05')], ['id', 'df1_date']).select('id', F.col('df1_date').cast('date'))
df1.show()
# +---+----------+
# | id| df1_date|
# +---+----------+
# | 1|2022-04-01|
# | 2|2022-04-05|
# +---+----------+
min_date = df1.groupBy().agg(F.min('df1_date')).head()[0]
max_date = df1.groupBy().agg(F.max('df1_date')).head()[0]
df2 = spark.sql(f"SELECT sequence(to_date('{min_date}'), to_date('{max_date}'), interval 1 day) as df2_date").withColumn('df2_date', F.explode('df2_date'))
df3 = df2.join(df1, df1.df1_date == df2.df2_date, 'left')
df3.show()
# +----------+----+----------+
# | df2_date| id| df1_date|
# +----------+----+----------+
# |2022-04-01| 1|2022-04-01|
# |2022-04-02|null| null|
# |2022-04-03|null| null|
# |2022-04-04|null| null|
# |2022-04-05| 2|2022-04-05|
# +----------+----+----------+
我有一个初始 PySpark 数据框,我想从中获取日期列中的 MIN
和 MAX
,然后使用时间序列(每日日期)创建一个新的 PySpark 数据框,使用来自我初始数据框的 MIN
和 MAX
。
然后我将使用它与我的初始数据框连接并查找缺失的日期(在我的初始 DF 的其余列中为空)。
我尝试了很多不同的方法来构建时间序列 DF,但它似乎在 PySpark 中不起作用。有什么建议吗?
可以像这样提取最大列的值:
df.groupBy().agg(F.max('col_name')).head()[0]
日期范围 df 可以这样创建:
df2 = spark.sql("SELECT sequence(to_date('2000-01-01'), to_date('2000-02-02'), interval 1 day) as date_col").withColumn('date_col', F.explode('date_col'))
然后join
。
完整示例:
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame([(1, '2022-04-01'),(2, '2022-04-05')], ['id', 'df1_date']).select('id', F.col('df1_date').cast('date'))
df1.show()
# +---+----------+
# | id| df1_date|
# +---+----------+
# | 1|2022-04-01|
# | 2|2022-04-05|
# +---+----------+
min_date = df1.groupBy().agg(F.min('df1_date')).head()[0]
max_date = df1.groupBy().agg(F.max('df1_date')).head()[0]
df2 = spark.sql(f"SELECT sequence(to_date('{min_date}'), to_date('{max_date}'), interval 1 day) as df2_date").withColumn('df2_date', F.explode('df2_date'))
df3 = df2.join(df1, df1.df1_date == df2.df2_date, 'left')
df3.show()
# +----------+----+----------+
# | df2_date| id| df1_date|
# +----------+----+----------+
# |2022-04-01| 1|2022-04-01|
# |2022-04-02|null| null|
# |2022-04-03|null| null|
# |2022-04-04|null| null|
# |2022-04-05| 2|2022-04-05|
# +----------+----+----------+