在 group by 上分解日期间隔并在 pyspark 中获取最后一个值

Explode date interval over a group by and take last value in pyspark

我有一个包含一些产品、日期和值的数据框。现在,日期在我要填写的记录值之间有不同的差距。这样我从第一次看到产品到最后一次看到每个小时都有一个记录值,如果没有记录我想使用最新值。

所以,我有一个像这样的数据框:

| ProductId | Date                          | Value |
|-----------|-------------------------------|-------|
| 1         | 2020-03-12T00:00:00.000+0000  | 4     |
| 1         | 2020-03-12T01:00:00.000+0000  | 2     |
| 2         | 2020-03-12T01:00:00.000+0000  | 3     |
| 2         | 2020-03-12T03:00:00.000+0000  | 4     |
| 1         | 2020-03-12T05:00:00.000+0000  | 4     |
| 3         | 2020-03-12T05:00:00.000+0000  | 2     |

我想创建一个如下所示的新数据框:

| ProductId | Date                          | Value |
|-----------|-------------------------------|-------|
| 1         | 2020-03-12T00:00:00.000+0000  | 4     |
| 1         | 2020-03-12T01:00:00.000+0000  | 2     |
| 1         | 2020-03-12T02:00:00.000+0000  | 2     |
| 1         | 2020-03-12T03:00:00.000+0000  | 2     |
| 1         | 2020-03-12T04:00:00.000+0000  | 2     |
| 1         | 2020-03-12T05:00:00.000+0000  | 4     |
| 2         | 2020-03-12T01:00:00.000+0000  | 3     |
| 2         | 2020-03-12T02:00:00.000+0000  | 3     |
| 2         | 2020-03-12T03:00:00.000+0000  | 4     |
| 3         | 2020-03-12T05:00:00.000+0000  | 2     |

到目前为止我的代码:

def generate_date_series(start, stop):
  start = datetime.strptime(start, "yyyy-MM-dd'T'HH:mm:ss.SSSZ")
  stop = datetime.strptime(stop, "yyyy-MM-dd'T'HH:mm:ss.SSSZ")
  return [start + datetime.timedelta(hours=x) for x in range(0, (stop-start).hours + 1)]

spark.udf.register("generate_date_series", generate_date_series, ArrayType(TimestampType()))

df = df.withColumn("max", max(col("Date")).over(Window.partitionBy("ProductId"))) \
       .withColumn("min", min(col("Date")).over(Window.partitionBy("ProductId"))) \
       .withColumn("Dato", explode(generate_date_series(col("min"), col("max"))) \
                          .over(Window.partitionBy("ProductId").orderBy(col("Dato").desc())))

window_over_ids = (Window.partitionBy("ProductId").rangeBetween(Window.unboundedPreceding, -1).orderBy("Date"))

df = df.withColumn("Value", last("Value", ignorenulls=True).over(window_over_ids))

错误:

TypeError: strptime() argument 1 must be str, not Column

所以第一个问题显然是如何正确创建和调用 udf,这样我就不会 运行 出现上述错误。

第二个问题是如何完成任务,以便获得我想要的数据帧?

经过一番搜索和试验,我找到了解决方案。我定义了一个 udf returns 两个日期之间的日期范围,间隔为 1 小时。然后我做一个前向填充

我用以下代码解决了这个问题:

def missing_hours(t1, t2):
    return [t1 + timedelta(hours=x) for x in range(0, int((t2-t1).total_seconds()/3600))]

missing_hours_udf = udf(missing_hours, ArrayType(TimestampType()))

window = Window.partitionBy("ProductId").orderBy("Date")

df_missing = df.withColumn("prev_timestamp", lag(col("Date"), 1, None).over(window)) \
                        .filter(col("prev_timestamp").isNotNull()) \
                        .withColumn("Date", explode(missing_hours_udf(col("prev_timestamp"), col("Date")))) \
                        .withColumn("Value", lit(None)) \
                        .drop("prev_timestamp")

df = df_original.union(df_missing)

window = Window.partitionBy("ProductId").orderBy("Date") \
               .rowsBetween(-sys.maxsize, 0)

# define the forward-filled column
filled_values_column = last(df['Value'], ignorenulls=True).over(window)

# do the fill
df = df.withColumn('Value', filled_values_column)