围绕给定值 PySpark 形成值序列
Form value sequence around given values PySpark
我有几个 PySpark 数据帧,其中第二个是从第一个派生的,用于检查异常。
import pandas as pd
from datetime import date
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df1 = pd.DataFrame({
"pk": [1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3],
"date": [
date(2022,4,13), date(2022,4,14), date(2022,4,15), date(2022,4,16),
date(2022,4,13), date(2022,4,14), date(2022,4,15), date(2022,4,16),
date(2022,4,13), date(2022,4,14), date(2022,4,15), date(2022,4,16),
],
"varA": [4, 5, 1, 6, 7, 4, 8, 11, 12, 10, 11, 13],
"varB": [5, 6, 4, 3, 12, 13, 1, 14, 9, 10, 11, 15],
"varC": [8, 32, 1, 11, 4, 3, 5, 6, 10, 14, 9, 11]
})
df1 = spark.createDataFrame(df1)
所以第一个数据框是一个“宽”数据框。第二个是“长”数据帧,作为异常处理算法的结果从第一个数据帧派生而来。在这里,每一行都是一个异常。
df2 = do_anomaly_processing(df1)
df2.show()
#+-----+-----------+--------+-----+
#|pk | date|variable|value|
#+-----+-----------+--------+-----+
#| 1| 2022-04-14| varA| 5|
#| 2| 2022-04-14| varA| 4|
#| 3| 2022-04-14| varA| 10|
#| 3| 2022-04-15| varC| 14|
#+-----+-----------+--------+-----+
生成此数据帧的代码是:
df2 = pd.DataFrame({
"pk": [1,2,3,3],
"date": [date(2022,4,14), date(2022,4,14), date(2022,4,14), date(2022,4,15)],
"variable": ["varA", "varA", "varA", "varC"],
"value": [5,4,10,14]
})
df2 = spark.createDataFrame(df2)
我想创建一个新的数据框,其中列出了每个异常的周围值,如下所示:
#+-----+-----------+--------+-----+------+------+
#|pk | date|variable|value|v[n-1]|v[n+1]|
#+-----+-----------+--------+-----+------+------+
#| 1| 2022-04-14| varA| 5| 4| 1|
#| 2| 2022-04-14| varA| 4| 7| 8|
#| 3| 2022-04-14| varA| 10| 12| 11|
#| 3| 2022-04-15| varC| 9| 14| 11|
#+-----+-----------+--------+-----+------+------+
实际上,我希望能够在数据允许的情况下将尽可能多的过去 and/or 未来值放入新数据框中(例如,(v[n-5], v[n-4], v[n-3], v[n-2], v[n-1])
,等等——但总是一个序列)。我最初是用 for
循环来做的:
for step in dt:
if step == 0:
varName = "v[n]"
shiftedDataframe = melt(
df1,
id_vars=["pk", "date"],
value_vars=["varA", "varB", "varC"],
var_name=varName
)
else:
if step < 0:
varName = f"v[n{step}]"
step = abs(step)
elif step > 0:
varName = f"v[n+{step}]"
step = -step
shiftedDataframe = melt(
create_shifted_dataframe(
df1,
"pk",
"date",
["varA", "varB", "varC"],
shiftAmount=step
),
id_vars=["pk", "date"],
value_vars=["varA", "varB", "varC"],
var_name=varName
)
df2 = df2.join(df1, on=["pk", "date", "variable"], how="left")
注意:函数 melt
和 create_shifted_dataframe
完全按照您的想法行事。实际上,我正在创建原始数据框 (df1
) 的移位版本,然后将其强制为“长”数据框,最后,我将 df1
的新移位融化版本合并到 df2
。这是我能找到的最好的方法,但必须有更有效的方法。我正在考虑做一个数据透视表,然后进行合并,但我无法弄清楚如何使用 df2
中的不同日期来执行此操作。无论如何,我希望这是有道理的,并且对某些人来说是一个有趣的问题。
如果我理解其中的逻辑,您可以利用 lead
和 lag
。
但首先我们需要 melt
df1
看起来像这样,
+---+----------+--------+--------------+
| pk| date|variable|original_value|
+---+----------+--------+--------------+
| 1|2022-04-13| varA| 4|
| 1|2022-04-13| varB| 5|
...
如果你有这个数据框,你可以用 df2
加入它。
df = df1.join(df2, on=['pk', 'date', 'variable'], how='left')
然后,可以用lead
(取当前行后offset处的值)和lag
(取当前行前offset处的值)得到v[n-1], v[n+1], v[n-2], v[n+2]...
# I will get until n+-2 in this example.
w = Window.partitionBy(['pk', 'variable']).orderBy('date')
df = (df.select('pk', 'date', 'variable', 'value',
*[F.when(F.col('value').isNotNull(), F.lag('original_value', x).over(w)).alias(f'v[n-{x}]') for x in range(1, 3)],
*[F.when(F.col('value').isNotNull(), F.lead('original_value', x).over(w)).alias(f'v[n+{x}]') for x in range(1, 3)])
.filter(F.col('value').isNotNull()))
我有几个 PySpark 数据帧,其中第二个是从第一个派生的,用于检查异常。
import pandas as pd
from datetime import date
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df1 = pd.DataFrame({
"pk": [1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3],
"date": [
date(2022,4,13), date(2022,4,14), date(2022,4,15), date(2022,4,16),
date(2022,4,13), date(2022,4,14), date(2022,4,15), date(2022,4,16),
date(2022,4,13), date(2022,4,14), date(2022,4,15), date(2022,4,16),
],
"varA": [4, 5, 1, 6, 7, 4, 8, 11, 12, 10, 11, 13],
"varB": [5, 6, 4, 3, 12, 13, 1, 14, 9, 10, 11, 15],
"varC": [8, 32, 1, 11, 4, 3, 5, 6, 10, 14, 9, 11]
})
df1 = spark.createDataFrame(df1)
所以第一个数据框是一个“宽”数据框。第二个是“长”数据帧,作为异常处理算法的结果从第一个数据帧派生而来。在这里,每一行都是一个异常。
df2 = do_anomaly_processing(df1)
df2.show()
#+-----+-----------+--------+-----+
#|pk | date|variable|value|
#+-----+-----------+--------+-----+
#| 1| 2022-04-14| varA| 5|
#| 2| 2022-04-14| varA| 4|
#| 3| 2022-04-14| varA| 10|
#| 3| 2022-04-15| varC| 14|
#+-----+-----------+--------+-----+
生成此数据帧的代码是:
df2 = pd.DataFrame({
"pk": [1,2,3,3],
"date": [date(2022,4,14), date(2022,4,14), date(2022,4,14), date(2022,4,15)],
"variable": ["varA", "varA", "varA", "varC"],
"value": [5,4,10,14]
})
df2 = spark.createDataFrame(df2)
我想创建一个新的数据框,其中列出了每个异常的周围值,如下所示:
#+-----+-----------+--------+-----+------+------+
#|pk | date|variable|value|v[n-1]|v[n+1]|
#+-----+-----------+--------+-----+------+------+
#| 1| 2022-04-14| varA| 5| 4| 1|
#| 2| 2022-04-14| varA| 4| 7| 8|
#| 3| 2022-04-14| varA| 10| 12| 11|
#| 3| 2022-04-15| varC| 9| 14| 11|
#+-----+-----------+--------+-----+------+------+
实际上,我希望能够在数据允许的情况下将尽可能多的过去 and/or 未来值放入新数据框中(例如,(v[n-5], v[n-4], v[n-3], v[n-2], v[n-1])
,等等——但总是一个序列)。我最初是用 for
循环来做的:
for step in dt:
if step == 0:
varName = "v[n]"
shiftedDataframe = melt(
df1,
id_vars=["pk", "date"],
value_vars=["varA", "varB", "varC"],
var_name=varName
)
else:
if step < 0:
varName = f"v[n{step}]"
step = abs(step)
elif step > 0:
varName = f"v[n+{step}]"
step = -step
shiftedDataframe = melt(
create_shifted_dataframe(
df1,
"pk",
"date",
["varA", "varB", "varC"],
shiftAmount=step
),
id_vars=["pk", "date"],
value_vars=["varA", "varB", "varC"],
var_name=varName
)
df2 = df2.join(df1, on=["pk", "date", "variable"], how="left")
注意:函数 melt
和 create_shifted_dataframe
完全按照您的想法行事。实际上,我正在创建原始数据框 (df1
) 的移位版本,然后将其强制为“长”数据框,最后,我将 df1
的新移位融化版本合并到 df2
。这是我能找到的最好的方法,但必须有更有效的方法。我正在考虑做一个数据透视表,然后进行合并,但我无法弄清楚如何使用 df2
中的不同日期来执行此操作。无论如何,我希望这是有道理的,并且对某些人来说是一个有趣的问题。
如果我理解其中的逻辑,您可以利用 lead
和 lag
。
但首先我们需要 melt
df1
看起来像这样,
+---+----------+--------+--------------+
| pk| date|variable|original_value|
+---+----------+--------+--------------+
| 1|2022-04-13| varA| 4|
| 1|2022-04-13| varB| 5|
...
如果你有这个数据框,你可以用 df2
加入它。
df = df1.join(df2, on=['pk', 'date', 'variable'], how='left')
然后,可以用lead
(取当前行后offset处的值)和lag
(取当前行前offset处的值)得到v[n-1], v[n+1], v[n-2], v[n+2]...
# I will get until n+-2 in this example.
w = Window.partitionBy(['pk', 'variable']).orderBy('date')
df = (df.select('pk', 'date', 'variable', 'value',
*[F.when(F.col('value').isNotNull(), F.lag('original_value', x).over(w)).alias(f'v[n-{x}]') for x in range(1, 3)],
*[F.when(F.col('value').isNotNull(), F.lead('original_value', x).over(w)).alias(f'v[n+{x}]') for x in range(1, 3)])
.filter(F.col('value').isNotNull()))