Pyspark 用户定义的列聚合计算
Pyspark user defined aggregate calculation on columns
我正在为 Pyspark 中的分类器准备输入数据。我一直在使用 SparkSQL 中的聚合函数来提取平均值和方差等特征。这些按 activity、名称和 window 分组。 Window 已通过将 unix 时间戳除以 10000 计算为 10 秒时间 windows。
sample = sqlContext.sql("SELECT activity, name, window, avg(acc_x) as avgX , variance(acc_x) as varX FROM data GROUP BY activity,name,window ORDER BY activity,name,window")
结果看起来像
Activity Name Window AvgX VarX
Walk accelerometer 95875 2.0 1.0
我现在要做的,是计算X中每个点的平均斜率。
为此我需要时间戳、window 和 X。我已经在 Python 中使用数组实现了逻辑,这就是它的样子——计算每个点之间的斜率,然后得到平均斜率。理想情况下,我想在 Pyspark 尚不支持的 UDAF 中执行此操作。 (它看起来像这样,假设下面的函数被称为斜率。然后在 sql 你可以做 slope(timestamp, X) as avgSlopeX
编辑 - 更改了输入,使其更清晰。
所以,我所做的就是计算每个点之间的斜率,然后返回 window 中斜率的平均值。所以,当我得到每个 window 的平均值和方差时,我还想得到平均斜率。
#sample input
timestamp = [1464703425544,1464703426534,1464703427551,1464703428587,1464703429512,1464703430493,1464703431505,1464703432543,1464703433513,1464703434529]
values = [1021.31,1021.26,1021.19,1021.19,1021.1,1021.1,1021.1, 1021.05,1021.02]
i = 0;
slope = 0.0;
totalSlope = 0.0;
while (i < len(timestamp) - 1):
y2 = values[i+1];
y1 = values[i];
x2 = timestamp[i + 1];
x1 = timestamp[i];
slope = ((y2-y1)/(x2-x1));
totalSlope = totalSlope + slope;
i=i+1
avgSlope = (totalSlope/len(x_values))
我该如何实施?我应该尝试转换为 pandas 数据帧然后转换为 numpy 数组吗?如果是这样,我如何才能确保数据仍能正确映射,记住 sql 查询中的 GROUP BY activity、名称 window。
通常这不是 UDAF 的工作,因为 UDAF 不提供任何定义顺序的方法。看来您真正需要的是 window 函数和标准聚合的某种组合。
from pyspark.sql.functions import col, lag, avg
from pyspark.sql.window import Window
df = ...
## DataFrame[activity: string, name: string, window: bigint,
## timestamp: bigint, value: float]
group = ["activity", "name", "window"]
w = (Window()
.partitionBy(*group)
.orderBy("timestamp"))
v_diff = col("value") - lag("value", 1).over(w)
t_diff = col("timestamp") - lag("timestamp", 1).over(w)
slope = v_diff / t_diff
df.withColumn("slope", slope).groupBy(*group).agg(avg(col("slope")))
我正在为 Pyspark 中的分类器准备输入数据。我一直在使用 SparkSQL 中的聚合函数来提取平均值和方差等特征。这些按 activity、名称和 window 分组。 Window 已通过将 unix 时间戳除以 10000 计算为 10 秒时间 windows。
sample = sqlContext.sql("SELECT activity, name, window, avg(acc_x) as avgX , variance(acc_x) as varX FROM data GROUP BY activity,name,window ORDER BY activity,name,window")
结果看起来像
Activity Name Window AvgX VarX
Walk accelerometer 95875 2.0 1.0
我现在要做的,是计算X中每个点的平均斜率。
为此我需要时间戳、window 和 X。我已经在 Python 中使用数组实现了逻辑,这就是它的样子——计算每个点之间的斜率,然后得到平均斜率。理想情况下,我想在 Pyspark 尚不支持的 UDAF 中执行此操作。 (它看起来像这样,假设下面的函数被称为斜率。然后在 sql 你可以做 slope(timestamp, X) as avgSlopeX
编辑 - 更改了输入,使其更清晰。 所以,我所做的就是计算每个点之间的斜率,然后返回 window 中斜率的平均值。所以,当我得到每个 window 的平均值和方差时,我还想得到平均斜率。
#sample input
timestamp = [1464703425544,1464703426534,1464703427551,1464703428587,1464703429512,1464703430493,1464703431505,1464703432543,1464703433513,1464703434529]
values = [1021.31,1021.26,1021.19,1021.19,1021.1,1021.1,1021.1, 1021.05,1021.02]
i = 0;
slope = 0.0;
totalSlope = 0.0;
while (i < len(timestamp) - 1):
y2 = values[i+1];
y1 = values[i];
x2 = timestamp[i + 1];
x1 = timestamp[i];
slope = ((y2-y1)/(x2-x1));
totalSlope = totalSlope + slope;
i=i+1
avgSlope = (totalSlope/len(x_values))
我该如何实施?我应该尝试转换为 pandas 数据帧然后转换为 numpy 数组吗?如果是这样,我如何才能确保数据仍能正确映射,记住 sql 查询中的 GROUP BY activity、名称 window。
通常这不是 UDAF 的工作,因为 UDAF 不提供任何定义顺序的方法。看来您真正需要的是 window 函数和标准聚合的某种组合。
from pyspark.sql.functions import col, lag, avg
from pyspark.sql.window import Window
df = ...
## DataFrame[activity: string, name: string, window: bigint,
## timestamp: bigint, value: float]
group = ["activity", "name", "window"]
w = (Window()
.partitionBy(*group)
.orderBy("timestamp"))
v_diff = col("value") - lag("value", 1).over(w)
t_diff = col("timestamp") - lag("timestamp", 1).over(w)
slope = v_diff / t_diff
df.withColumn("slope", slope).groupBy(*group).agg(avg(col("slope")))