在 group by 上分解日期间隔并在 pyspark 中获取最后一个值
Explode date interval over a group by and take last value in pyspark
我有一个包含一些产品、日期和值的数据框。现在,日期在我要填写的记录值之间有不同的差距。这样我从第一次看到产品到最后一次看到每个小时都有一个记录值,如果没有记录我想使用最新值。
所以,我有一个像这样的数据框:
| ProductId | Date | Value |
|-----------|-------------------------------|-------|
| 1 | 2020-03-12T00:00:00.000+0000 | 4 |
| 1 | 2020-03-12T01:00:00.000+0000 | 2 |
| 2 | 2020-03-12T01:00:00.000+0000 | 3 |
| 2 | 2020-03-12T03:00:00.000+0000 | 4 |
| 1 | 2020-03-12T05:00:00.000+0000 | 4 |
| 3 | 2020-03-12T05:00:00.000+0000 | 2 |
我想创建一个如下所示的新数据框:
| ProductId | Date | Value |
|-----------|-------------------------------|-------|
| 1 | 2020-03-12T00:00:00.000+0000 | 4 |
| 1 | 2020-03-12T01:00:00.000+0000 | 2 |
| 1 | 2020-03-12T02:00:00.000+0000 | 2 |
| 1 | 2020-03-12T03:00:00.000+0000 | 2 |
| 1 | 2020-03-12T04:00:00.000+0000 | 2 |
| 1 | 2020-03-12T05:00:00.000+0000 | 4 |
| 2 | 2020-03-12T01:00:00.000+0000 | 3 |
| 2 | 2020-03-12T02:00:00.000+0000 | 3 |
| 2 | 2020-03-12T03:00:00.000+0000 | 4 |
| 3 | 2020-03-12T05:00:00.000+0000 | 2 |
到目前为止我的代码:
def generate_date_series(start, stop):
start = datetime.strptime(start, "yyyy-MM-dd'T'HH:mm:ss.SSSZ")
stop = datetime.strptime(stop, "yyyy-MM-dd'T'HH:mm:ss.SSSZ")
return [start + datetime.timedelta(hours=x) for x in range(0, (stop-start).hours + 1)]
spark.udf.register("generate_date_series", generate_date_series, ArrayType(TimestampType()))
df = df.withColumn("max", max(col("Date")).over(Window.partitionBy("ProductId"))) \
.withColumn("min", min(col("Date")).over(Window.partitionBy("ProductId"))) \
.withColumn("Dato", explode(generate_date_series(col("min"), col("max"))) \
.over(Window.partitionBy("ProductId").orderBy(col("Dato").desc())))
window_over_ids = (Window.partitionBy("ProductId").rangeBetween(Window.unboundedPreceding, -1).orderBy("Date"))
df = df.withColumn("Value", last("Value", ignorenulls=True).over(window_over_ids))
错误:
TypeError: strptime() argument 1 must be str, not Column
所以第一个问题显然是如何正确创建和调用 udf,这样我就不会 运行 出现上述错误。
第二个问题是如何完成任务,以便获得我想要的数据帧?
经过一番搜索和试验,我找到了解决方案。我定义了一个 udf returns 两个日期之间的日期范围,间隔为 1 小时。然后我做一个前向填充
我用以下代码解决了这个问题:
def missing_hours(t1, t2):
return [t1 + timedelta(hours=x) for x in range(0, int((t2-t1).total_seconds()/3600))]
missing_hours_udf = udf(missing_hours, ArrayType(TimestampType()))
window = Window.partitionBy("ProductId").orderBy("Date")
df_missing = df.withColumn("prev_timestamp", lag(col("Date"), 1, None).over(window)) \
.filter(col("prev_timestamp").isNotNull()) \
.withColumn("Date", explode(missing_hours_udf(col("prev_timestamp"), col("Date")))) \
.withColumn("Value", lit(None)) \
.drop("prev_timestamp")
df = df_original.union(df_missing)
window = Window.partitionBy("ProductId").orderBy("Date") \
.rowsBetween(-sys.maxsize, 0)
# define the forward-filled column
filled_values_column = last(df['Value'], ignorenulls=True).over(window)
# do the fill
df = df.withColumn('Value', filled_values_column)
我有一个包含一些产品、日期和值的数据框。现在,日期在我要填写的记录值之间有不同的差距。这样我从第一次看到产品到最后一次看到每个小时都有一个记录值,如果没有记录我想使用最新值。
所以,我有一个像这样的数据框:
| ProductId | Date | Value |
|-----------|-------------------------------|-------|
| 1 | 2020-03-12T00:00:00.000+0000 | 4 |
| 1 | 2020-03-12T01:00:00.000+0000 | 2 |
| 2 | 2020-03-12T01:00:00.000+0000 | 3 |
| 2 | 2020-03-12T03:00:00.000+0000 | 4 |
| 1 | 2020-03-12T05:00:00.000+0000 | 4 |
| 3 | 2020-03-12T05:00:00.000+0000 | 2 |
我想创建一个如下所示的新数据框:
| ProductId | Date | Value |
|-----------|-------------------------------|-------|
| 1 | 2020-03-12T00:00:00.000+0000 | 4 |
| 1 | 2020-03-12T01:00:00.000+0000 | 2 |
| 1 | 2020-03-12T02:00:00.000+0000 | 2 |
| 1 | 2020-03-12T03:00:00.000+0000 | 2 |
| 1 | 2020-03-12T04:00:00.000+0000 | 2 |
| 1 | 2020-03-12T05:00:00.000+0000 | 4 |
| 2 | 2020-03-12T01:00:00.000+0000 | 3 |
| 2 | 2020-03-12T02:00:00.000+0000 | 3 |
| 2 | 2020-03-12T03:00:00.000+0000 | 4 |
| 3 | 2020-03-12T05:00:00.000+0000 | 2 |
到目前为止我的代码:
def generate_date_series(start, stop):
start = datetime.strptime(start, "yyyy-MM-dd'T'HH:mm:ss.SSSZ")
stop = datetime.strptime(stop, "yyyy-MM-dd'T'HH:mm:ss.SSSZ")
return [start + datetime.timedelta(hours=x) for x in range(0, (stop-start).hours + 1)]
spark.udf.register("generate_date_series", generate_date_series, ArrayType(TimestampType()))
df = df.withColumn("max", max(col("Date")).over(Window.partitionBy("ProductId"))) \
.withColumn("min", min(col("Date")).over(Window.partitionBy("ProductId"))) \
.withColumn("Dato", explode(generate_date_series(col("min"), col("max"))) \
.over(Window.partitionBy("ProductId").orderBy(col("Dato").desc())))
window_over_ids = (Window.partitionBy("ProductId").rangeBetween(Window.unboundedPreceding, -1).orderBy("Date"))
df = df.withColumn("Value", last("Value", ignorenulls=True).over(window_over_ids))
错误:
TypeError: strptime() argument 1 must be str, not Column
所以第一个问题显然是如何正确创建和调用 udf,这样我就不会 运行 出现上述错误。
第二个问题是如何完成任务,以便获得我想要的数据帧?
经过一番搜索和试验,我找到了解决方案。我定义了一个 udf returns 两个日期之间的日期范围,间隔为 1 小时。然后我做一个前向填充
我用以下代码解决了这个问题:
def missing_hours(t1, t2):
return [t1 + timedelta(hours=x) for x in range(0, int((t2-t1).total_seconds()/3600))]
missing_hours_udf = udf(missing_hours, ArrayType(TimestampType()))
window = Window.partitionBy("ProductId").orderBy("Date")
df_missing = df.withColumn("prev_timestamp", lag(col("Date"), 1, None).over(window)) \
.filter(col("prev_timestamp").isNotNull()) \
.withColumn("Date", explode(missing_hours_udf(col("prev_timestamp"), col("Date")))) \
.withColumn("Value", lit(None)) \
.drop("prev_timestamp")
df = df_original.union(df_missing)
window = Window.partitionBy("ProductId").orderBy("Date") \
.rowsBetween(-sys.maxsize, 0)
# define the forward-filled column
filled_values_column = last(df['Value'], ignorenulls=True).over(window)
# do the fill
df = df.withColumn('Value', filled_values_column)