PySpark - 为组创建特定的时间序列范围

PySpark - Creating Specific Time Series Range for Groups

我有 1000 个 Tags,有一个 Timestamp 和一个 Value。对于 Tags 中的每一个,日期范围都是“2020-01-01”,但是每个标签的数据太多了。我有一个单独的数据框,第一个数据框中的每个标签都有一个 StartEnd

我只需要上述数据中包含1000个标签数据的日期范围内的数据。我还需要在 Start 日期前 2 天和 End 日期后 1 天填充所需数据框中的时间序列数据。

df1 = spark.createDataFrame(
    [("Tag 1", "2020-05-01", 1), ("Tag 1000", "2021-02-01", 1),
        ("Tag 1", "2020-05-02", 2), ("Tag 1000", "2021-02-02", 2),
        ("Tag 1", "2020-05-03", 3), ("Tag 1000", "2021-02-03", 3),
        ("Tag 1", "2020-05-04", 4), ("Tag 1000", "2021-02-04", 4),
        ("Tag 1", "2020-05-05", 5), ("Tag 1000", "2021-02-05", 5),
        ("Tag 1", "2020-05-06", 6), ("Tag 1000", "2021-02-06", 6)],
    ["Tag", "Timestamp", "Value"])

df2 = spark.createDataFrame(
    [("Tag 1", "2020-05-02", "2020-05-03"), ("Tag 1000", "2021-02-03", "2021-02-04")],
    ["Tag", "Start", "End"])

所需的数据帧:

print(df1)

Tag       Timestamp  Value
Tag 1     2020-05-01 1
Tag 1     2020-05-02 2
Tag 1     2020-05-03 3
Tag 1     2020-05-04 4       #Notice day 5 and 6 are not in the df
Tag 1000  2020-02-01 1
Tag 1000  2020-02-02 2
Tag 1000  2020-02-03 3
Tag 1000  2020-02-04 4
Tag 1000  2020-02-05 5       #Notice day 6 are not in the df

这样做只会根据第二个数据框提供我需要的日期,并且会消除 1,000,000 行不会被分析的行。

到目前为止我所理解的是创建 window.

w = Window().partitionBy("Tag").orderBy("Timestamp")

您需要将列 timestampstartend 转换为 DateType 首先使用 to_date 函数,使用 date_add 添加填充天数,最后加入两个数据帧,其中 timestamp 列的日期在填充的 start[ 之间=22=] 和 end

from pyspark.sql.functions import col, to_date, date_add

# convert to DateType
df1 = df1.withColumn('timestamp', to_date(col('Timestamp'), "yyyy-MM-dd"))
# convert to DateType then add padding days
df2 = (df2
       .withColumn("start_date", date_add(to_date(col('Start'), 'yyyy-MM-dd'), -2))
       .withColumn("end_date", date_add(to_date(col("End"), 'yyyy-MM-dd'), 1)))

df1 = df1.join(df2.withColumnRenamed('Tag', 'Tag2'),
               [col('Tag') == col('Tag2'), col('timestamp').between(col('start_date'), col('end_date'))],
               'left_semi')
df1.show()

+--------+----------+-----+
|     Tag| timestamp|Value|
+--------+----------+-----+
|Tag 1000|2021-02-01|    1|
|Tag 1000|2021-02-02|    2|
|Tag 1000|2021-02-03|    3|
|Tag 1000|2021-02-04|    4|
|Tag 1000|2021-02-05|    5|
|   Tag 1|2020-05-01|    1|
|   Tag 1|2020-05-02|    2|
|   Tag 1|2020-05-03|    3|
|   Tag 1|2020-05-04|    4|
+--------+----------+-----+