围绕给定值 PySpark 形成值序列

Form value sequence around given values PySpark

我有几个 PySpark 数据帧,其中第二个是从第一个派生的,用于检查异常。

import pandas as pd
from datetime import date
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df1 = pd.DataFrame({
    "pk": [1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3],
    "date": [
        date(2022,4,13), date(2022,4,14), date(2022,4,15), date(2022,4,16),  
        date(2022,4,13), date(2022,4,14), date(2022,4,15), date(2022,4,16),
        date(2022,4,13), date(2022,4,14), date(2022,4,15), date(2022,4,16),
    ],
    "varA": [4, 5, 1, 6, 7, 4, 8, 11, 12, 10, 11, 13],
    "varB": [5, 6, 4, 3, 12, 13, 1, 14, 9, 10, 11, 15],
    "varC": [8, 32, 1, 11, 4, 3, 5, 6, 10, 14, 9, 11]
})

df1 = spark.createDataFrame(df1)

所以第一个数据框是一个“宽”数据框。第二个是“长”数据帧,作为异常处理算法的结果从第一个数据帧派生而来。在这里,每一行都是一个异常。

df2 = do_anomaly_processing(df1)

df2.show()

#+-----+-----------+--------+-----+
#|pk   |       date|variable|value|
#+-----+-----------+--------+-----+
#|    1| 2022-04-14|    varA|    5|
#|    2| 2022-04-14|    varA|    4|
#|    3| 2022-04-14|    varA|   10|
#|    3| 2022-04-15|    varC|   14|
#+-----+-----------+--------+-----+

生成此数据帧的代码是:

df2 = pd.DataFrame({
    "pk": [1,2,3,3],
    "date": [date(2022,4,14), date(2022,4,14), date(2022,4,14), date(2022,4,15)],
    "variable": ["varA", "varA", "varA", "varC"],
    "value": [5,4,10,14]
})

df2 = spark.createDataFrame(df2)

我想创建一个新的数据框,其中列出了每个异常的周围值,如下所示:

#+-----+-----------+--------+-----+------+------+
#|pk   |       date|variable|value|v[n-1]|v[n+1]|
#+-----+-----------+--------+-----+------+------+
#|    1| 2022-04-14|    varA|    5|     4|     1|
#|    2| 2022-04-14|    varA|    4|     7|     8|
#|    3| 2022-04-14|    varA|   10|    12|    11|
#|    3| 2022-04-15|    varC|    9|    14|    11|
#+-----+-----------+--------+-----+------+------+

实际上,我希望能够在数据允许的情况下将尽可能多的过去 and/or 未来值放入新数据框中(例如,(v[n-5], v[n-4], v[n-3], v[n-2], v[n-1]),等等——但总是一个序列)。我最初是用 for 循环来做的:

for step in dt:
    if step == 0:
        varName = "v[n]"

        shiftedDataframe = melt(
            df1,
            id_vars=["pk", "date"],
            value_vars=["varA", "varB", "varC"],
            var_name=varName
        )
    else:
        if step < 0:
            varName = f"v[n{step}]"
            step = abs(step)
        elif step > 0:
            varName = f"v[n+{step}]"
            step = -step

            shiftedDataframe = melt(
                create_shifted_dataframe(
                    df1,
                    "pk",
                    "date",
                    ["varA", "varB", "varC"],
                    shiftAmount=step
                ),
                id_vars=["pk", "date"],
                value_vars=["varA", "varB", "varC"],
                var_name=varName
            )
    
    df2 = df2.join(df1, on=["pk", "date", "variable"], how="left")

注意:函数 meltcreate_shifted_dataframe 完全按照您的想法行事。实际上,我正在创建原始数据框 (df1) 的移位版本,然后将其强制为“长”数据框,最后,我将 df1 的新移位融化版本合并到 df2。这是我能找到的最好的方法,但必须有更有效的方法。我正在考虑做一个数据透视表,然后进行合并,但我无法弄清楚如何使用 df2 中的不同日期来执行此操作。无论如何,我希望这是有道理的,并且对某些人来说是一个有趣的问题。

如果我理解其中的逻辑,您可以利用 leadlag

但首先我们需要 melt df1 看起来像这样,

+---+----------+--------+--------------+
| pk|      date|variable|original_value|
+---+----------+--------+--------------+
|  1|2022-04-13|    varA|             4|
|  1|2022-04-13|    varB|             5|
...

如果你有这个数据框,你可以用 df2 加入它。

df = df1.join(df2, on=['pk', 'date', 'variable'], how='left')

然后,可以用lead(取当前行后offset处的值)和lag(取当前行前offset处的值)得到v[n-1], v[n+1], v[n-2], v[n+2]...

# I will get until n+-2 in this example.
w = Window.partitionBy(['pk', 'variable']).orderBy('date')
df = (df.select('pk', 'date', 'variable', 'value',
        *[F.when(F.col('value').isNotNull(), F.lag('original_value', x).over(w)).alias(f'v[n-{x}]') for x in range(1, 3)],
        *[F.when(F.col('value').isNotNull(), F.lead('original_value', x).over(w)).alias(f'v[n+{x}]') for x in range(1, 3)])
    .filter(F.col('value').isNotNull()))