如何根据 PySpark 中非空值之间的时间间隔插入时间序列

How to interpolate time series based on time gap between non null values in PySpark

我想插入时间序列数据。因此,挑战在于仅当现有值之间的时间间隔不大于指定限制时才进行插值。

输入数据

from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.driver.memory", "60g").getOrCreate()

df = spark.createDataFrame([{'timestamp': 1642205833225, 'value': 58.00},
                            {'timestamp': 1642205888654, 'value': float('nan')},
                            {'timestamp': 1642205899657, 'value': float('nan')},
                            {'timestamp': 1642205892970, 'value': 55.00},
                            {'timestamp': 1642206338180, 'value': float('nan')},
                            {'timestamp': 1642206353652, 'value': 56.45},
                            {'timestamp': 1642206853451, 'value': float('nan')},
                            {'timestamp': 1642207353652, 'value': 80.45}
                            ])
df.show()

+-------------+-----+
|    timestamp|value|
+-------------+-----+
|1642205833225| 58.0|
|1642205888654|  NaN|
|1642205899654|  NaN|
|1642205892970| 55.0|
|1642206338180|  NaN|
|1642206353652|56.45|
|1642206853451|  NaN|
|1642207353652|80.45|
+-------------+-----+

首先我想计算到下一个现有值的时间间隔 (next_value - current_value).

+-------------+-----+---------------+
|    timestamp|value|timegap_to_next|
+-------------+-----+---------------+
|1642205833225| 58.0|          59745|
|1642205888654|  NaN|            NaN|
|1642205899657|  NaN|            NaN|
|1642205892970| 55.0|         460682|
|1642206338180|  NaN|            NaN|
|1642206353652|56.45|        1030300|
|1642206853451|  NaN|            NaN|
|1642207383952|80.45|            NaN|
+-------------+-----+---------------+

根据计算得出的 Timegap 应该进行插值。在这种情况下,阈值是 500000.

最终输出:

+-------------+-----+---------------+
|    timestamp|value|timegap_to_next|
+-------------+-----+---------------+
|1642205833225| 58.0|          59745|
|1642205888654| 57.0|            NaN|
|1642205899657| 56.0|            NaN|
|1642205892970| 55.0|         460682|
|1642206338180|55.75|            NaN|
|1642206353652|56.45|        1030300|
|1642206853451|  NaN|            NaN|
|1642207383952|80.45|            NaN|
+-------------+-----+---------------+

谁能帮我解决这个特殊情况?那就太好了!

具有此输入数据框:

df = spark.createDataFrame([
    (1642205833225, 58.00), (1642205888654, float('nan')),
    (1642205899657, float('nan')), (1642205899970, 55.00),
    (1642206338180, float('nan')), (1642206353652, 56.45),
    (1642206853451, float('nan')), (1642207353652, 80.45)
], ["timestamp", "value"])

# replace NaN value by Nulls
df = df.replace(float("nan"), None, ["value"])

您可以使用一些 window 函数(lastfirst)获取每行的下一个和上一个非空值并计算时间间隔,如下所示:

from pyspark.sql import functions as F, Window

w1 = Window.orderBy("timestamp").rowsBetween(1, Window.unboundedFollowing)
w2 = Window.orderBy("timestamp").rowsBetween(Window.unboundedPreceding, -1)

df = (
    df.withColumn("rn", F.row_number().over(Window.orderBy("timestamp")))
    .withColumn("next_val", F.first("value", ignorenulls=True).over(w1))
    .withColumn("next_rn", F.first(F.when(F.col("value").isNotNull(), F.col("rn")), ignorenulls=True).over(w1))
    .withColumn("prev_val", F.last("value", ignorenulls=True).over(w2))
    .withColumn("prev_rn", F.last(F.when(F.col("value").isNotNull(), F.col("rn")), ignorenulls=True).over(w2))
    .withColumn("timegap_to_next", F.when(F.col("value").isNotNull(), F.min(F.when(F.col("value").isNotNull(), F.col("timestamp"))).over(w1) - F.col("timestamp")))
)

现在,您可以根据阈值使用 when 表达式对 value 列进行线性插值:

w3 = Window.orderBy("timestamp").rowsBetween(Window.unboundedPreceding, Window.currentRow)

df = df.withColumn(
    "value",
    F.coalesce(
        "value",
        F.when(
            F.last("timegap_to_next", ignorenulls=True).over(w3) < 500000,
            (F.col("prev_val") + 
            ((F.col("next_val") - F.col("prev_val"))/ 
            (F.col("next_timestamp") - F.col("prev_next_timestamp"))
            * (F.col("timestamp") - F.col("prev_next_timestamp")
                    )
                )
            )
        )
    )
).select("timestamp", "value", "timegap_to_next")

df.show()

#+-------------+------+---------------+
#|    timestamp| value|timegap_to_next|
#+-------------+------+---------------+
#|1642205833225|  58.0|          66745|
#|1642205888654|  56.0|           null|
#|1642205899657|  57.0|           null|
#|1642205899970|  55.0|         453682|
#|1642206338180|55.725|           null|
#|1642206353652| 56.45|        1000000|
#|1642206853451|  null|           null|
#|1642207353652| 80.45|           null|
#+-------------+------+---------------+