如何根据 PySpark 中非空值之间的时间间隔插入时间序列
How to interpolate time series based on time gap between non null values in PySpark
我想插入时间序列数据。因此,挑战在于仅当现有值之间的时间间隔不大于指定限制时才进行插值。
输入数据
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.driver.memory", "60g").getOrCreate()
df = spark.createDataFrame([{'timestamp': 1642205833225, 'value': 58.00},
{'timestamp': 1642205888654, 'value': float('nan')},
{'timestamp': 1642205899657, 'value': float('nan')},
{'timestamp': 1642205892970, 'value': 55.00},
{'timestamp': 1642206338180, 'value': float('nan')},
{'timestamp': 1642206353652, 'value': 56.45},
{'timestamp': 1642206853451, 'value': float('nan')},
{'timestamp': 1642207353652, 'value': 80.45}
])
df.show()
+-------------+-----+
| timestamp|value|
+-------------+-----+
|1642205833225| 58.0|
|1642205888654| NaN|
|1642205899654| NaN|
|1642205892970| 55.0|
|1642206338180| NaN|
|1642206353652|56.45|
|1642206853451| NaN|
|1642207353652|80.45|
+-------------+-----+
首先我想计算到下一个现有值的时间间隔
(next_value - current_value
).
+-------------+-----+---------------+
| timestamp|value|timegap_to_next|
+-------------+-----+---------------+
|1642205833225| 58.0| 59745|
|1642205888654| NaN| NaN|
|1642205899657| NaN| NaN|
|1642205892970| 55.0| 460682|
|1642206338180| NaN| NaN|
|1642206353652|56.45| 1030300|
|1642206853451| NaN| NaN|
|1642207383952|80.45| NaN|
+-------------+-----+---------------+
根据计算得出的 Timegap
应该进行插值。在这种情况下,阈值是 500000
.
最终输出:
+-------------+-----+---------------+
| timestamp|value|timegap_to_next|
+-------------+-----+---------------+
|1642205833225| 58.0| 59745|
|1642205888654| 57.0| NaN|
|1642205899657| 56.0| NaN|
|1642205892970| 55.0| 460682|
|1642206338180|55.75| NaN|
|1642206353652|56.45| 1030300|
|1642206853451| NaN| NaN|
|1642207383952|80.45| NaN|
+-------------+-----+---------------+
谁能帮我解决这个特殊情况?那就太好了!
具有此输入数据框:
df = spark.createDataFrame([
(1642205833225, 58.00), (1642205888654, float('nan')),
(1642205899657, float('nan')), (1642205899970, 55.00),
(1642206338180, float('nan')), (1642206353652, 56.45),
(1642206853451, float('nan')), (1642207353652, 80.45)
], ["timestamp", "value"])
# replace NaN value by Nulls
df = df.replace(float("nan"), None, ["value"])
您可以使用一些 window 函数(last
、first
)获取每行的下一个和上一个非空值并计算时间间隔,如下所示:
from pyspark.sql import functions as F, Window
w1 = Window.orderBy("timestamp").rowsBetween(1, Window.unboundedFollowing)
w2 = Window.orderBy("timestamp").rowsBetween(Window.unboundedPreceding, -1)
df = (
df.withColumn("rn", F.row_number().over(Window.orderBy("timestamp")))
.withColumn("next_val", F.first("value", ignorenulls=True).over(w1))
.withColumn("next_rn", F.first(F.when(F.col("value").isNotNull(), F.col("rn")), ignorenulls=True).over(w1))
.withColumn("prev_val", F.last("value", ignorenulls=True).over(w2))
.withColumn("prev_rn", F.last(F.when(F.col("value").isNotNull(), F.col("rn")), ignorenulls=True).over(w2))
.withColumn("timegap_to_next", F.when(F.col("value").isNotNull(), F.min(F.when(F.col("value").isNotNull(), F.col("timestamp"))).over(w1) - F.col("timestamp")))
)
现在,您可以根据阈值使用 when
表达式对 value
列进行线性插值:
w3 = Window.orderBy("timestamp").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = df.withColumn(
"value",
F.coalesce(
"value",
F.when(
F.last("timegap_to_next", ignorenulls=True).over(w3) < 500000,
(F.col("prev_val") +
((F.col("next_val") - F.col("prev_val"))/
(F.col("next_timestamp") - F.col("prev_next_timestamp"))
* (F.col("timestamp") - F.col("prev_next_timestamp")
)
)
)
)
)
).select("timestamp", "value", "timegap_to_next")
df.show()
#+-------------+------+---------------+
#| timestamp| value|timegap_to_next|
#+-------------+------+---------------+
#|1642205833225| 58.0| 66745|
#|1642205888654| 56.0| null|
#|1642205899657| 57.0| null|
#|1642205899970| 55.0| 453682|
#|1642206338180|55.725| null|
#|1642206353652| 56.45| 1000000|
#|1642206853451| null| null|
#|1642207353652| 80.45| null|
#+-------------+------+---------------+
我想插入时间序列数据。因此,挑战在于仅当现有值之间的时间间隔不大于指定限制时才进行插值。
输入数据
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.driver.memory", "60g").getOrCreate()
df = spark.createDataFrame([{'timestamp': 1642205833225, 'value': 58.00},
{'timestamp': 1642205888654, 'value': float('nan')},
{'timestamp': 1642205899657, 'value': float('nan')},
{'timestamp': 1642205892970, 'value': 55.00},
{'timestamp': 1642206338180, 'value': float('nan')},
{'timestamp': 1642206353652, 'value': 56.45},
{'timestamp': 1642206853451, 'value': float('nan')},
{'timestamp': 1642207353652, 'value': 80.45}
])
df.show()
+-------------+-----+
| timestamp|value|
+-------------+-----+
|1642205833225| 58.0|
|1642205888654| NaN|
|1642205899654| NaN|
|1642205892970| 55.0|
|1642206338180| NaN|
|1642206353652|56.45|
|1642206853451| NaN|
|1642207353652|80.45|
+-------------+-----+
首先我想计算到下一个现有值的时间间隔
(next_value - current_value
).
+-------------+-----+---------------+
| timestamp|value|timegap_to_next|
+-------------+-----+---------------+
|1642205833225| 58.0| 59745|
|1642205888654| NaN| NaN|
|1642205899657| NaN| NaN|
|1642205892970| 55.0| 460682|
|1642206338180| NaN| NaN|
|1642206353652|56.45| 1030300|
|1642206853451| NaN| NaN|
|1642207383952|80.45| NaN|
+-------------+-----+---------------+
根据计算得出的 Timegap
应该进行插值。在这种情况下,阈值是 500000
.
最终输出:
+-------------+-----+---------------+
| timestamp|value|timegap_to_next|
+-------------+-----+---------------+
|1642205833225| 58.0| 59745|
|1642205888654| 57.0| NaN|
|1642205899657| 56.0| NaN|
|1642205892970| 55.0| 460682|
|1642206338180|55.75| NaN|
|1642206353652|56.45| 1030300|
|1642206853451| NaN| NaN|
|1642207383952|80.45| NaN|
+-------------+-----+---------------+
谁能帮我解决这个特殊情况?那就太好了!
具有此输入数据框:
df = spark.createDataFrame([
(1642205833225, 58.00), (1642205888654, float('nan')),
(1642205899657, float('nan')), (1642205899970, 55.00),
(1642206338180, float('nan')), (1642206353652, 56.45),
(1642206853451, float('nan')), (1642207353652, 80.45)
], ["timestamp", "value"])
# replace NaN value by Nulls
df = df.replace(float("nan"), None, ["value"])
您可以使用一些 window 函数(last
、first
)获取每行的下一个和上一个非空值并计算时间间隔,如下所示:
from pyspark.sql import functions as F, Window
w1 = Window.orderBy("timestamp").rowsBetween(1, Window.unboundedFollowing)
w2 = Window.orderBy("timestamp").rowsBetween(Window.unboundedPreceding, -1)
df = (
df.withColumn("rn", F.row_number().over(Window.orderBy("timestamp")))
.withColumn("next_val", F.first("value", ignorenulls=True).over(w1))
.withColumn("next_rn", F.first(F.when(F.col("value").isNotNull(), F.col("rn")), ignorenulls=True).over(w1))
.withColumn("prev_val", F.last("value", ignorenulls=True).over(w2))
.withColumn("prev_rn", F.last(F.when(F.col("value").isNotNull(), F.col("rn")), ignorenulls=True).over(w2))
.withColumn("timegap_to_next", F.when(F.col("value").isNotNull(), F.min(F.when(F.col("value").isNotNull(), F.col("timestamp"))).over(w1) - F.col("timestamp")))
)
现在,您可以根据阈值使用 when
表达式对 value
列进行线性插值:
w3 = Window.orderBy("timestamp").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = df.withColumn(
"value",
F.coalesce(
"value",
F.when(
F.last("timegap_to_next", ignorenulls=True).over(w3) < 500000,
(F.col("prev_val") +
((F.col("next_val") - F.col("prev_val"))/
(F.col("next_timestamp") - F.col("prev_next_timestamp"))
* (F.col("timestamp") - F.col("prev_next_timestamp")
)
)
)
)
)
).select("timestamp", "value", "timegap_to_next")
df.show()
#+-------------+------+---------------+
#| timestamp| value|timegap_to_next|
#+-------------+------+---------------+
#|1642205833225| 58.0| 66745|
#|1642205888654| 56.0| null|
#|1642205899657| 57.0| null|
#|1642205899970| 55.0| 453682|
#|1642206338180|55.725| null|
#|1642206353652| 56.45| 1000000|
#|1642206853451| null| null|
#|1642207353652| 80.45| null|
#+-------------+------+---------------+