PySpark - 为组创建特定的时间序列范围
PySpark - Creating Specific Time Series Range for Groups
我有 1000 个 Tags
,有一个 Timestamp
和一个 Value
。对于 Tags
中的每一个,日期范围都是“2020-01-01”,但是每个标签的数据太多了。我有一个单独的数据框,第一个数据框中的每个标签都有一个 Start
和 End
。
我只需要上述数据中包含1000个标签数据的日期范围内的数据。我还需要在 Start
日期前 2 天和 End
日期后 1 天填充所需数据框中的时间序列数据。
df1 = spark.createDataFrame(
[("Tag 1", "2020-05-01", 1), ("Tag 1000", "2021-02-01", 1),
("Tag 1", "2020-05-02", 2), ("Tag 1000", "2021-02-02", 2),
("Tag 1", "2020-05-03", 3), ("Tag 1000", "2021-02-03", 3),
("Tag 1", "2020-05-04", 4), ("Tag 1000", "2021-02-04", 4),
("Tag 1", "2020-05-05", 5), ("Tag 1000", "2021-02-05", 5),
("Tag 1", "2020-05-06", 6), ("Tag 1000", "2021-02-06", 6)],
["Tag", "Timestamp", "Value"])
df2 = spark.createDataFrame(
[("Tag 1", "2020-05-02", "2020-05-03"), ("Tag 1000", "2021-02-03", "2021-02-04")],
["Tag", "Start", "End"])
所需的数据帧:
print(df1)
Tag Timestamp Value
Tag 1 2020-05-01 1
Tag 1 2020-05-02 2
Tag 1 2020-05-03 3
Tag 1 2020-05-04 4 #Notice day 5 and 6 are not in the df
Tag 1000 2020-02-01 1
Tag 1000 2020-02-02 2
Tag 1000 2020-02-03 3
Tag 1000 2020-02-04 4
Tag 1000 2020-02-05 5 #Notice day 6 are not in the df
这样做只会根据第二个数据框提供我需要的日期,并且会消除 1,000,000 行不会被分析的行。
到目前为止我所理解的是创建 window.
w = Window().partitionBy("Tag").orderBy("Timestamp")
您需要将列 timestamp、start 和 end 转换为 DateType
首先使用 to_date
函数,使用 date_add
添加填充天数,最后加入两个数据帧,其中 timestamp 列的日期在填充的 start[ 之间=22=] 和 end
from pyspark.sql.functions import col, to_date, date_add
# convert to DateType
df1 = df1.withColumn('timestamp', to_date(col('Timestamp'), "yyyy-MM-dd"))
# convert to DateType then add padding days
df2 = (df2
.withColumn("start_date", date_add(to_date(col('Start'), 'yyyy-MM-dd'), -2))
.withColumn("end_date", date_add(to_date(col("End"), 'yyyy-MM-dd'), 1)))
df1 = df1.join(df2.withColumnRenamed('Tag', 'Tag2'),
[col('Tag') == col('Tag2'), col('timestamp').between(col('start_date'), col('end_date'))],
'left_semi')
df1.show()
+--------+----------+-----+
| Tag| timestamp|Value|
+--------+----------+-----+
|Tag 1000|2021-02-01| 1|
|Tag 1000|2021-02-02| 2|
|Tag 1000|2021-02-03| 3|
|Tag 1000|2021-02-04| 4|
|Tag 1000|2021-02-05| 5|
| Tag 1|2020-05-01| 1|
| Tag 1|2020-05-02| 2|
| Tag 1|2020-05-03| 3|
| Tag 1|2020-05-04| 4|
+--------+----------+-----+
我有 1000 个 Tags
,有一个 Timestamp
和一个 Value
。对于 Tags
中的每一个,日期范围都是“2020-01-01”,但是每个标签的数据太多了。我有一个单独的数据框,第一个数据框中的每个标签都有一个 Start
和 End
。
我只需要上述数据中包含1000个标签数据的日期范围内的数据。我还需要在 Start
日期前 2 天和 End
日期后 1 天填充所需数据框中的时间序列数据。
df1 = spark.createDataFrame(
[("Tag 1", "2020-05-01", 1), ("Tag 1000", "2021-02-01", 1),
("Tag 1", "2020-05-02", 2), ("Tag 1000", "2021-02-02", 2),
("Tag 1", "2020-05-03", 3), ("Tag 1000", "2021-02-03", 3),
("Tag 1", "2020-05-04", 4), ("Tag 1000", "2021-02-04", 4),
("Tag 1", "2020-05-05", 5), ("Tag 1000", "2021-02-05", 5),
("Tag 1", "2020-05-06", 6), ("Tag 1000", "2021-02-06", 6)],
["Tag", "Timestamp", "Value"])
df2 = spark.createDataFrame(
[("Tag 1", "2020-05-02", "2020-05-03"), ("Tag 1000", "2021-02-03", "2021-02-04")],
["Tag", "Start", "End"])
所需的数据帧:
print(df1)
Tag Timestamp Value
Tag 1 2020-05-01 1
Tag 1 2020-05-02 2
Tag 1 2020-05-03 3
Tag 1 2020-05-04 4 #Notice day 5 and 6 are not in the df
Tag 1000 2020-02-01 1
Tag 1000 2020-02-02 2
Tag 1000 2020-02-03 3
Tag 1000 2020-02-04 4
Tag 1000 2020-02-05 5 #Notice day 6 are not in the df
这样做只会根据第二个数据框提供我需要的日期,并且会消除 1,000,000 行不会被分析的行。
到目前为止我所理解的是创建 window.
w = Window().partitionBy("Tag").orderBy("Timestamp")
您需要将列 timestamp、start 和 end 转换为 DateType
首先使用 to_date
函数,使用 date_add
添加填充天数,最后加入两个数据帧,其中 timestamp 列的日期在填充的 start[ 之间=22=] 和 end
from pyspark.sql.functions import col, to_date, date_add
# convert to DateType
df1 = df1.withColumn('timestamp', to_date(col('Timestamp'), "yyyy-MM-dd"))
# convert to DateType then add padding days
df2 = (df2
.withColumn("start_date", date_add(to_date(col('Start'), 'yyyy-MM-dd'), -2))
.withColumn("end_date", date_add(to_date(col("End"), 'yyyy-MM-dd'), 1)))
df1 = df1.join(df2.withColumnRenamed('Tag', 'Tag2'),
[col('Tag') == col('Tag2'), col('timestamp').between(col('start_date'), col('end_date'))],
'left_semi')
df1.show()
+--------+----------+-----+
| Tag| timestamp|Value|
+--------+----------+-----+
|Tag 1000|2021-02-01| 1|
|Tag 1000|2021-02-02| 2|
|Tag 1000|2021-02-03| 3|
|Tag 1000|2021-02-04| 4|
|Tag 1000|2021-02-05| 5|
| Tag 1|2020-05-01| 1|
| Tag 1|2020-05-02| 2|
| Tag 1|2020-05-03| 3|
| Tag 1|2020-05-04| 4|
+--------+----------+-----+