从 pyspark 中的数据框创建历史数据

Create historical data from a dataframe in pyspark

我有一个数据框如下:

date some_quantity
... ...
2021-01-01 4
2021-01-02 1
2021-01-03 6
2021-01-04 2
2021-01-05 2
2021-01-06 8
2021-01-07 9
2021-01-08 1
... ...

我想为每个日历日创建历史数据,并在最后一步进行一些聚合。中间数据框应如下所示:

calendar_date date some_quantity
... ... ...
2021-01-03 2021-01-01 4
2021-01-03 2021-01-02 1
2021-01-04 ... ...
2021-01-04 2021-01-01 4
2021-01-04 2021-01-02 1
2021-01-04 2021-01-03 6
2021-01-05 ... ...
2021-01-05 2021-01-01 4
2021-01-05 2021-01-02 1
2021-01-05 2021-01-03 6
2021-01-05 2021-01-04 2
2021-01-06 ... ...
2021-01-06 2021-01-01 4
2021-01-06 2021-01-02 1
2021-01-06 2021-01-03 6
2021-01-06 2021-01-04 2
2021-01-06 2021-01-05 2
2021-01-06 ... ...

有了这个数据框,日历日期上的任何聚合都变得很容易(例如,指出当天之前售出的数量、平均 7 天、平均 30 天等)。

我尝试 运行 日历日期的 for 循环:

for i, date in enumerate(pd.data_range("2021-01-01","2021-03-01"):

   df_output = []

   df_transformed = df.where(F.col("date") < date)
   df_transformed = df_transformed.withColumn("calendar_date", date)

   if i == 0:
      df_output = df_transformed
   else:
      df_output = df_output.union(df_transformed)

但是,这是非常低效的,并且在我开始包含更多列时它就会崩溃。

是否可以创建一个包含日历日期的数据框并进行连接以重新创建我期望的数据框?

感谢您的帮助。

您可以简单地将日历数据框与您的主数据框连接起来,连接条件为“小于”:

# Let's call your main dataframe as `df`

# Extracting first and last date
_, min_date, max_date = (df
    .groupBy(F.lit(1))
    .agg(
        F.min('date').alias('min_date'),
        F.max('date').alias('max_date'),
    )
    .first()
)

# Then create a temporary dataframe to hold all calendar dates
dates = [{'calendar_date': str(d.date())} for d in pd.date_range(min_date, max_date)]
calendar_df = spark.createDataFrame(dates)
calendar_df.show(10, False)
# +-------------+
# |calendar_date|
# +-------------+
# |2021-01-01   |
# |2021-01-02   |
# |2021-01-03   |
# |2021-01-04   |
# |2021-01-05   |
# |2021-01-06   |
# |2021-01-07   |
# |2021-01-08   |
# +-------------+

# Now you can join to build your expected dataframe, note the join condition
(calendar_df
    .join(df, on=[calendar_df.calendar_date > df.date])
    .show()
)
# +-------------+----------+---+
# |calendar_date|      date|qty|
# +-------------+----------+---+
# |   2021-01-02|2021-01-01|  4|
# |   2021-01-03|2021-01-01|  4|
# |   2021-01-03|2021-01-02|  1|
# |   2021-01-04|2021-01-01|  4|
# |   2021-01-04|2021-01-02|  1|
# |   2021-01-04|2021-01-03|  6|
# |   2021-01-05|2021-01-01|  4|
# |   2021-01-05|2021-01-02|  1|
# |   2021-01-05|2021-01-03|  6|
# |   2021-01-05|2021-01-04|  2|
# |   2021-01-06|2021-01-01|  4|
# |   2021-01-06|2021-01-02|  1|
# |   2021-01-06|2021-01-03|  6|
# |   2021-01-06|2021-01-04|  2|
# |   2021-01-06|2021-01-05|  2|
# |   2021-01-07|2021-01-01|  4|
# |   2021-01-07|2021-01-02|  1|
# |   2021-01-07|2021-01-03|  6|
# |   2021-01-07|2021-01-04|  2|
# |   2021-01-07|2021-01-05|  2|
# +-------------+----------+---+
# only showing top 20 rows