pyspark 高性能 rolling/window 时间序列数据聚合
pyspark high performance rolling/window aggregations on timeseries data
基本问题
我有一个大约有 100 亿行的数据集。我正在寻找在四个不同时间 windows(3 天、7 天、14 天)内计算 rolling/windowed aggregates/metrics(总和、平均值、最小值、最大值、stddev)的最高效方法, 21 天).
Spark/AWS EMR 规格
火花版本:2.4.4
ec2实例类型:r5.24xlarge
核心 ec2 实例数:10
num pyspark 分区:600
概览
我读了一堆 SO 帖子,这些帖子要么涉及计算滚动统计信息的机制,要么涉及如何使 Window 函数更快。但是,none 的帖子以解决我问题的方式结合了这两个概念。我在下面展示了一些可以满足我要求的选项,但我需要它们在我的真实数据集上运行得更快,所以我正在寻找 faster/better.
的建议
我的数据集结构如下,但大约有 100 亿行:
+--------------------------+----+-----+
|date |name|value|
+--------------------------+----+-----+
|2020-12-20 17:45:19.536796|1 |5 |
|2020-12-21 17:45:19.53683 |1 |105 |
|2020-12-22 17:45:19.536846|1 |205 |
|2020-12-23 17:45:19.536861|1 |305 |
|2020-12-24 17:45:19.536875|1 |405 |
|2020-12-25 17:45:19.536891|1 |505 |
|2020-12-26 17:45:19.536906|1 |605 |
|2020-12-20 17:45:19.536796|2 |10 |
|2020-12-21 17:45:19.53683 |2 |110 |
|2020-12-22 17:45:19.536846|2 |210 |
|2020-12-23 17:45:19.536861|2 |310 |
|2020-12-24 17:45:19.536875|2 |410 |
|2020-12-25 17:45:19.536891|2 |510 |
|2020-12-26 17:45:19.536906|2 |610 |
|2020-12-20 17:45:19.536796|3 |15 |
|2020-12-21 17:45:19.53683 |3 |115 |
|2020-12-22 17:45:19.536846|3 |215 |
我需要我的数据集如下所示。注意:已显示 window 7 天 window 的统计数据,但我还需要其他三个 windows。
+--------------------------+----+-----+----+-----+---+---+------------------+
|date |name|value|sum |mean |min|max|stddev |
+--------------------------+----+-----+----+-----+---+---+------------------+
|2020-12-20 17:45:19.536796|1 |5 |5 |5.0 |5 |5 |NaN |
|2020-12-21 17:45:19.53683 |1 |105 |110 |55.0 |5 |105|70.71067811865476 |
|2020-12-22 17:45:19.536846|1 |205 |315 |105.0|5 |205|100.0 |
|2020-12-23 17:45:19.536861|1 |305 |620 |155.0|5 |305|129.09944487358058|
|2020-12-24 17:45:19.536875|1 |405 |1025|205.0|5 |405|158.11388300841898|
|2020-12-25 17:45:19.536891|1 |505 |1530|255.0|5 |505|187.08286933869707|
|2020-12-26 17:45:19.536906|1 |605 |2135|305.0|5 |605|216.02468994692867|
|2020-12-20 17:45:19.536796|2 |10 |10 |10.0 |10 |10 |NaN |
|2020-12-21 17:45:19.53683 |2 |110 |120 |60.0 |10 |110|70.71067811865476 |
|2020-12-22 17:45:19.536846|2 |210 |330 |110.0|10 |210|100.0 |
|2020-12-23 17:45:19.536861|2 |310 |640 |160.0|10 |310|129.09944487358058|
|2020-12-24 17:45:19.536875|2 |410 |1050|210.0|10 |410|158.11388300841898|
|2020-12-25 17:45:19.536891|2 |510 |1560|260.0|10 |510|187.08286933869707|
|2020-12-26 17:45:19.536906|2 |610 |2170|310.0|10 |610|216.02468994692867|
|2020-12-20 17:45:19.536796|3 |15 |15 |15.0 |15 |15 |NaN |
|2020-12-21 17:45:19.53683 |3 |115 |130 |65.0 |15 |115|70.71067811865476 |
|2020-12-22 17:45:19.536846|3 |215 |345 |115.0|15 |215|100.0 |
详情
为了便于阅读,我将在这些示例中只做一个 window。我尝试过的事情:
- 基本
Window().over()
语法
- 将 windowed 值转换为数组列并使用高阶函数
- 火花SQL
设置
import datetime
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import FloatType
import pandas as pd
import numpy as np
spark = SparkSession.builder.appName('example').getOrCreate()
# create spark dataframe
n = 7
names = [1, 2, 3]
date_list = [datetime.datetime.today() - datetime.timedelta(days=(n-x)) for x in range(n)]
values = [x*100 for x in range(n)]
rows = []
for name in names:
for d, v in zip(date_list, values):
rows.append(
{
"name": name,
"date": d,
"value": v+(5*name)
}
)
df = spark.createDataFrame(data=rows)
# setup window
window_days = 7
window = (
Window
.partitionBy(F.col("name"))
.orderBy(F.col("date").cast("timestamp").cast("long"))
.rangeBetween(-window_days * 60 * 60 * 24 + 1, Window.currentRow)
)
1。基本
这会创建多个 window 规范,如图 here 所示,因此是串行执行的,在大型数据集上运行速度非常慢
status_quo = (df
.withColumn("sum",F.sum(F.col("value")).over(window))
.withColumn("mean",F.avg(F.col("value")).over(window))
.withColumn("min",F.min(F.col("value")).over(window))
.withColumn("max",F.max(F.col("value")).over(window))
.withColumn("stddev",F.stddev(F.col("value")).over(window))
)
status_quo.show()
status_quo.explain()
2。数组列 --> 高阶函数
Per this answer 似乎创建了更少的 window 规范,但是 aggregate()
函数语法对我来说毫无意义,我不知道如何写 stddev
使用高阶函数,在小测试中性能似乎并没有好多少
@F.udf(returnType=FloatType())
def array_stddev(row_value):
"""
temporary function since I don't know how to write higher order standard deviation
"""
return np.std(row_value, dtype=float).tolist()
# 1. collect window into array column
# 2. use higher order (array) functions to calculate aggregations over array (window values)
# Question: how to write standard deviation in aggregate()
hof_example = (
df
.withColumn("value_array", F.collect_list(F.col("value")).over(window))
.withColumn("sum_example", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x)'))
.withColumn("mean_example", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x, acc -> acc / size(value_array))'))
.withColumn("max_example", F.array_max(F.col("value_array")))
.withColumn("min_example", F.array_min(F.col("value_array")))
.withColumn("std_example", array_stddev(F.col("value_array")))
)
3。火花 SQL
这似乎是简单测试中最快的。唯一的(小)问题是我的代码库的其余部分使用 DataFrame API。在小型测试中似乎更快,但未在完整数据集上测试。
df.createOrReplaceTempView("df")
sql_example = spark.sql(
"""
SELECT
*
, sum(value)
OVER (
PARTITION BY name
ORDER BY CAST(date AS timestamp)
RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
) AS sum
, mean(value)
OVER (
PARTITION BY name
ORDER BY CAST(date AS timestamp)
RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
) AS mean
, min(value)
OVER (
PARTITION BY name
ORDER BY CAST(date AS timestamp)
RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
) AS min
, max(value)
OVER (
PARTITION BY name
ORDER BY CAST(date AS timestamp)
RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
) AS max
, stddev(value)
OVER (
PARTITION BY name
ORDER BY CAST(date AS timestamp)
RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
) AS stddev
FROM df"""
)
为 stddev 尝试这个聚合。如果您想了解语法,可以查看 docs.
hof_example = (
df
.withColumn("value_array", F.collect_list(F.col("value")).over(window))
.withColumn("sum_example", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x)'))
.withColumn("mean_example", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x, acc -> acc / size(value_array))'))
.withColumn("max_example", F.array_max(F.col("value_array")))
.withColumn("min_example", F.array_min(F.col("value_array")))
.withColumn("std_example", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + (x - mean_example)*(x - mean_example), acc -> sqrt(acc / (size(value_array) - 1)))'))
)
顺便说一下,我不认为其他两种方法(pyspark window vs spark sql)有什么不同。查询计划看起来和我一样。 (为了减少查询计划的大小,我只选择了 min 和 max)
Pyspark 查询计划:
status_quo = (df
.withColumn("min",F.min(F.col("value")).over(window))
.withColumn("max",F.max(F.col("value")).over(window))
)
status_quo.explain()
== Physical Plan ==
*(4) Project [date#3793, name#3794L, value#3795L, min#3800L, max#3807L]
+- Window [max(value#3795L) windowspecdefinition(name#3794L, _w0#3808L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -604799, currentrow$())) AS max#3807L], [name#3794L], [_w0#3808L ASC NULLS FIRST]
+- *(3) Sort [name#3794L ASC NULLS FIRST, _w0#3808L ASC NULLS FIRST], false, 0
+- *(3) Project [date#3793, name#3794L, value#3795L, min#3800L, cast(date#3793 as bigint) AS _w0#3808L]
+- Window [min(value#3795L) windowspecdefinition(name#3794L, _w0#3801L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -604799, currentrow$())) AS min#3800L], [name#3794L], [_w0#3801L ASC NULLS FIRST]
+- *(2) Sort [name#3794L ASC NULLS FIRST, _w0#3801L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#3794L, 200), true, [id=#812]
+- *(1) Project [date#3793, name#3794L, value#3795L, cast(date#3793 as bigint) AS _w0#3801L]
+- *(1) Scan ExistingRDD[date#3793,name#3794L,value#3795L]
Spark SQL 查询计划:
df.createOrReplaceTempView("df")
sql_example = spark.sql(
"""
SELECT
*
, min(value)
OVER (
PARTITION BY name
ORDER BY CAST(date AS timestamp)
RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
) AS min
, max(value)
OVER (
PARTITION BY name
ORDER BY CAST(date AS timestamp)
RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
) AS max
FROM df"""
)
sql_example.explain()
== Physical Plan ==
*(4) Project [date#3793, name#3794L, value#3795L, min#4670L, max#4671L]
+- Window [max(value#3795L) windowspecdefinition(name#3794L, _w1#4675 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -7 days, currentrow$())) AS max#4671L], [name#3794L], [_w1#4675 ASC NULLS FIRST]
+- *(3) Sort [name#3794L ASC NULLS FIRST, _w1#4675 ASC NULLS FIRST], false, 0
+- *(3) Project [date#3793, name#3794L, value#3795L, _w1#4675, min#4670L]
+- Window [min(value#3795L) windowspecdefinition(name#3794L, _w0#4674 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -7 days, currentrow$())) AS min#4670L], [name#3794L], [_w0#4674 ASC NULLS FIRST]
+- *(2) Sort [name#3794L ASC NULLS FIRST, _w0#4674 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#3794L, 200), true, [id=#955]
+- *(1) Project [date#3793, name#3794L, value#3795L, date#3793 AS _w0#4674, date#3793 AS _w1#4675]
+- *(1) Scan ExistingRDD[date#3793,name#3794L,value#3795L]
聚合函数查询计划:
hof_example.explain()
== Physical Plan ==
Project [date#3793, name#3794L, value#3795L, value_array#5516, aggregate(value_array#5516, 0.0, lambdafunction((lambda acc#5523 + cast(lambda x#5524L as double)), lambda acc#5523, lambda x#5524L, false), lambdafunction(lambda id#5525, lambda id#5525, false)) AS sum_example#5522, aggregate(value_array#5516, 0.0, lambdafunction((lambda acc#5532 + cast(lambda x#5533L as double)), lambda acc#5532, lambda x#5533L, false), lambdafunction((lambda acc#5534 / cast(size(value_array#5516, true) as double)), lambda acc#5534, false)) AS mean_example#5531, array_max(value_array#5516) AS max_example#5541L, array_min(value_array#5516) AS min_example#5549L, aggregate(value_array#5516, 0.0, lambdafunction((lambda acc#5559 + ((cast(lambda x#5560L as double) - aggregate(value_array#5516, 0.0, lambdafunction((lambda acc#5532 + cast(lambda x#5533L as double)), lambda acc#5532, lambda x#5533L, false), lambdafunction((lambda acc#5534 / cast(size(value_array#5516, true) as double)), lambda acc#5534, false))) * (cast(lambda x#5560L as double) - aggregate(value_array#5516, 0.0, lambdafunction((lambda acc#5532 + cast(lambda x#5533L as double)), lambda acc#5532, lambda x#5533L, false), lambdafunction((lambda acc#5534 / cast(size(value_array#5516, true) as double)), lambda acc#5534, false))))), lambda acc#5559, lambda x#5560L, false), lambdafunction(SQRT((lambda acc#5561 / cast((size(value_array#5516, true) - 1) as double))), lambda acc#5561, false)) AS std_example#5558]
+- Window [collect_list(value#3795L, 0, 0) windowspecdefinition(name#3794L, _w0#5517L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -604799, currentrow$())) AS value_array#5516], [name#3794L], [_w0#5517L ASC NULLS FIRST]
+- *(2) Sort [name#3794L ASC NULLS FIRST, _w0#5517L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#3794L, 200), true, [id=#1136]
+- *(1) Project [date#3793, name#3794L, value#3795L, cast(date#3793 as bigint) AS _w0#5517L]
+- *(1) Scan ExistingRDD[date#3793,name#3794L,value#3795L]
注意:我暂时将此标记为已接受的答案。如果有人找到 faster/better 请通知我,我会换它!
编辑说明:此处显示的计算假设输入数据帧已通过天级滚动计算预处理到天级
在我发布问题后,我在我的真实数据集上测试了几个不同的选项(并从同事那里得到了一些输入),我相信最快的方法(对于大型数据集)使用 pyspark.sql.functions.window()
和 groupby().agg
而不是 pyspark.sql.window.Window()
.
可以找到类似的答案
完成这项工作的步骤是:
- 按
name
和 date
对数据框进行排序(在示例数据框中)
.persist()
数据帧
- 使用
F.window()
计算分组数据帧,并为每个 window 需要加入回 df
。
查看此操作的 best/easiest 方式在 Spark GUI 事物中的 SQL 图上。当使用 Window()
时,SQL 执行完全是顺序的。但是,当使用 F.window()
时,图表显示并行化!注意:在小型数据集上 Window()
似乎仍然更快。
在我的 7 天真实数据测试中 windows,Window()
比 F.window()
慢 3-5 倍。唯一的缺点是 F.window()
使用起来不太方便。我在下面显示了一些代码和屏幕截图以供参考
找到最快的解决方案(F.window()
和 groupby.agg()
)
# this turned out to be super important for tricking spark into parallelizing things
df = df.orderBy("name", "date")
df.persist()
fwindow7 = F.window(
F.col("date"),
windowDuration="7 days",
slideDuration="1 days",
).alias("window")
gb7 = (
df
.groupBy(F.col("name"), fwindow7)
.agg(
F.sum(F.col("value")).alias("sum7"),
F.avg(F.col("value")).alias("mean7"),
F.min(F.col("value")).alias("min7"),
F.max(F.col("value")).alias("max7"),
F.stddev(F.col("value")).alias("stddev7"),
F.count(F.col("value")).alias("cnt7")
)
.withColumn("date", F.date_sub(F.col("window.end").cast("date"), 1))
.drop("window")
)
window_function_example = df.join(gb7, ["name", "date"], how="left")
fwindow14 = F.window(
F.col("date"),
windowDuration="14 days",
slideDuration="1 days",
).alias("window")
gb14 = (
df
.groupBy(F.col("name"), fwindow14)
.agg(
F.sum(F.col("value")).alias("sum14"),
F.avg(F.col("value")).alias("mean14"),
F.min(F.col("value")).alias("min14"),
F.max(F.col("value")).alias("max14"),
F.stddev(F.col("value")).alias("stddev14"),
F.count(F.col("value")).alias("cnt14")
)
.withColumn("date", F.date_sub(F.col("window.end").cast("date"), 1))
.drop("window")
)
window_function_example = window_function_example.join(gb14, ["name", "date"], how="left")
window_function_example.orderBy("name", "date").show(truncate=True)
SQL 图
原始问题的选项 2(高阶函数应用于 Window()
)
window7 = (
Window
.partitionBy(F.col("name"))
.orderBy(F.col("date").cast("timestamp").cast("long"))
.rangeBetween(-7 * 60 * 60 * 24 + 1, Window.currentRow)
)
window14 = (
Window
.partitionBy(F.col("name"))
.orderBy(F.col("date").cast("timestamp").cast("long"))
.rangeBetween(-14 * 60 * 60 * 24 + 1, Window.currentRow)
)
hof_example = (
df
.withColumn("value_array", F.collect_list(F.col("value")).over(window7))
.withColumn("sum7", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x)'))
.withColumn("mean7", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x, acc -> acc / size(value_array))'))
.withColumn("max7", F.array_max(F.col("value_array")))
.withColumn("min7", F.array_min(F.col("value_array")))
.withColumn("std7", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + (x - mean7)*(x - mean7), acc -> sqrt(acc / (size(value_array) - 1)))'))
.withColumn("count7", F.size(F.col("value_array")))
.drop("value_array")
)
hof_example = (
hof_example
.withColumn("value_array", F.collect_list(F.col("value")).over(window14))
.withColumn("sum14", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x)'))
.withColumn("mean14", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x, acc -> acc / size(value_array))'))
.withColumn("max14", F.array_max(F.col("value_array")))
.withColumn("min14", F.array_min(F.col("value_array")))
.withColumn("std14", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + (x - mean14)*(x - mean14), acc -> sqrt(acc / (size(value_array) - 1)))'))
.withColumn("count14", F.size(F.col("value_array")))
.drop("value_array")
)
hof_example.show(truncate=True)
SQL 图表片段
基本问题
我有一个大约有 100 亿行的数据集。我正在寻找在四个不同时间 windows(3 天、7 天、14 天)内计算 rolling/windowed aggregates/metrics(总和、平均值、最小值、最大值、stddev)的最高效方法, 21 天).
Spark/AWS EMR 规格
火花版本:2.4.4
ec2实例类型:r5.24xlarge
核心 ec2 实例数:10
num pyspark 分区:600
概览
我读了一堆 SO 帖子,这些帖子要么涉及计算滚动统计信息的机制,要么涉及如何使 Window 函数更快。但是,none 的帖子以解决我问题的方式结合了这两个概念。我在下面展示了一些可以满足我要求的选项,但我需要它们在我的真实数据集上运行得更快,所以我正在寻找 faster/better.
的建议我的数据集结构如下,但大约有 100 亿行:
+--------------------------+----+-----+
|date |name|value|
+--------------------------+----+-----+
|2020-12-20 17:45:19.536796|1 |5 |
|2020-12-21 17:45:19.53683 |1 |105 |
|2020-12-22 17:45:19.536846|1 |205 |
|2020-12-23 17:45:19.536861|1 |305 |
|2020-12-24 17:45:19.536875|1 |405 |
|2020-12-25 17:45:19.536891|1 |505 |
|2020-12-26 17:45:19.536906|1 |605 |
|2020-12-20 17:45:19.536796|2 |10 |
|2020-12-21 17:45:19.53683 |2 |110 |
|2020-12-22 17:45:19.536846|2 |210 |
|2020-12-23 17:45:19.536861|2 |310 |
|2020-12-24 17:45:19.536875|2 |410 |
|2020-12-25 17:45:19.536891|2 |510 |
|2020-12-26 17:45:19.536906|2 |610 |
|2020-12-20 17:45:19.536796|3 |15 |
|2020-12-21 17:45:19.53683 |3 |115 |
|2020-12-22 17:45:19.536846|3 |215 |
我需要我的数据集如下所示。注意:已显示 window 7 天 window 的统计数据,但我还需要其他三个 windows。
+--------------------------+----+-----+----+-----+---+---+------------------+
|date |name|value|sum |mean |min|max|stddev |
+--------------------------+----+-----+----+-----+---+---+------------------+
|2020-12-20 17:45:19.536796|1 |5 |5 |5.0 |5 |5 |NaN |
|2020-12-21 17:45:19.53683 |1 |105 |110 |55.0 |5 |105|70.71067811865476 |
|2020-12-22 17:45:19.536846|1 |205 |315 |105.0|5 |205|100.0 |
|2020-12-23 17:45:19.536861|1 |305 |620 |155.0|5 |305|129.09944487358058|
|2020-12-24 17:45:19.536875|1 |405 |1025|205.0|5 |405|158.11388300841898|
|2020-12-25 17:45:19.536891|1 |505 |1530|255.0|5 |505|187.08286933869707|
|2020-12-26 17:45:19.536906|1 |605 |2135|305.0|5 |605|216.02468994692867|
|2020-12-20 17:45:19.536796|2 |10 |10 |10.0 |10 |10 |NaN |
|2020-12-21 17:45:19.53683 |2 |110 |120 |60.0 |10 |110|70.71067811865476 |
|2020-12-22 17:45:19.536846|2 |210 |330 |110.0|10 |210|100.0 |
|2020-12-23 17:45:19.536861|2 |310 |640 |160.0|10 |310|129.09944487358058|
|2020-12-24 17:45:19.536875|2 |410 |1050|210.0|10 |410|158.11388300841898|
|2020-12-25 17:45:19.536891|2 |510 |1560|260.0|10 |510|187.08286933869707|
|2020-12-26 17:45:19.536906|2 |610 |2170|310.0|10 |610|216.02468994692867|
|2020-12-20 17:45:19.536796|3 |15 |15 |15.0 |15 |15 |NaN |
|2020-12-21 17:45:19.53683 |3 |115 |130 |65.0 |15 |115|70.71067811865476 |
|2020-12-22 17:45:19.536846|3 |215 |345 |115.0|15 |215|100.0 |
详情
为了便于阅读,我将在这些示例中只做一个 window。我尝试过的事情:
- 基本
Window().over()
语法 - 将 windowed 值转换为数组列并使用高阶函数
- 火花SQL
设置
import datetime
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import FloatType
import pandas as pd
import numpy as np
spark = SparkSession.builder.appName('example').getOrCreate()
# create spark dataframe
n = 7
names = [1, 2, 3]
date_list = [datetime.datetime.today() - datetime.timedelta(days=(n-x)) for x in range(n)]
values = [x*100 for x in range(n)]
rows = []
for name in names:
for d, v in zip(date_list, values):
rows.append(
{
"name": name,
"date": d,
"value": v+(5*name)
}
)
df = spark.createDataFrame(data=rows)
# setup window
window_days = 7
window = (
Window
.partitionBy(F.col("name"))
.orderBy(F.col("date").cast("timestamp").cast("long"))
.rangeBetween(-window_days * 60 * 60 * 24 + 1, Window.currentRow)
)
1。基本
这会创建多个 window 规范,如图 here 所示,因此是串行执行的,在大型数据集上运行速度非常慢
status_quo = (df
.withColumn("sum",F.sum(F.col("value")).over(window))
.withColumn("mean",F.avg(F.col("value")).over(window))
.withColumn("min",F.min(F.col("value")).over(window))
.withColumn("max",F.max(F.col("value")).over(window))
.withColumn("stddev",F.stddev(F.col("value")).over(window))
)
status_quo.show()
status_quo.explain()
2。数组列 --> 高阶函数
Per this answer 似乎创建了更少的 window 规范,但是 aggregate()
函数语法对我来说毫无意义,我不知道如何写 stddev
使用高阶函数,在小测试中性能似乎并没有好多少
@F.udf(returnType=FloatType())
def array_stddev(row_value):
"""
temporary function since I don't know how to write higher order standard deviation
"""
return np.std(row_value, dtype=float).tolist()
# 1. collect window into array column
# 2. use higher order (array) functions to calculate aggregations over array (window values)
# Question: how to write standard deviation in aggregate()
hof_example = (
df
.withColumn("value_array", F.collect_list(F.col("value")).over(window))
.withColumn("sum_example", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x)'))
.withColumn("mean_example", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x, acc -> acc / size(value_array))'))
.withColumn("max_example", F.array_max(F.col("value_array")))
.withColumn("min_example", F.array_min(F.col("value_array")))
.withColumn("std_example", array_stddev(F.col("value_array")))
)
3。火花 SQL
这似乎是简单测试中最快的。唯一的(小)问题是我的代码库的其余部分使用 DataFrame API。在小型测试中似乎更快,但未在完整数据集上测试。
df.createOrReplaceTempView("df")
sql_example = spark.sql(
"""
SELECT
*
, sum(value)
OVER (
PARTITION BY name
ORDER BY CAST(date AS timestamp)
RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
) AS sum
, mean(value)
OVER (
PARTITION BY name
ORDER BY CAST(date AS timestamp)
RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
) AS mean
, min(value)
OVER (
PARTITION BY name
ORDER BY CAST(date AS timestamp)
RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
) AS min
, max(value)
OVER (
PARTITION BY name
ORDER BY CAST(date AS timestamp)
RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
) AS max
, stddev(value)
OVER (
PARTITION BY name
ORDER BY CAST(date AS timestamp)
RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
) AS stddev
FROM df"""
)
为 stddev 尝试这个聚合。如果您想了解语法,可以查看 docs.
hof_example = (
df
.withColumn("value_array", F.collect_list(F.col("value")).over(window))
.withColumn("sum_example", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x)'))
.withColumn("mean_example", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x, acc -> acc / size(value_array))'))
.withColumn("max_example", F.array_max(F.col("value_array")))
.withColumn("min_example", F.array_min(F.col("value_array")))
.withColumn("std_example", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + (x - mean_example)*(x - mean_example), acc -> sqrt(acc / (size(value_array) - 1)))'))
)
顺便说一下,我不认为其他两种方法(pyspark window vs spark sql)有什么不同。查询计划看起来和我一样。 (为了减少查询计划的大小,我只选择了 min 和 max)
Pyspark 查询计划:
status_quo = (df
.withColumn("min",F.min(F.col("value")).over(window))
.withColumn("max",F.max(F.col("value")).over(window))
)
status_quo.explain()
== Physical Plan ==
*(4) Project [date#3793, name#3794L, value#3795L, min#3800L, max#3807L]
+- Window [max(value#3795L) windowspecdefinition(name#3794L, _w0#3808L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -604799, currentrow$())) AS max#3807L], [name#3794L], [_w0#3808L ASC NULLS FIRST]
+- *(3) Sort [name#3794L ASC NULLS FIRST, _w0#3808L ASC NULLS FIRST], false, 0
+- *(3) Project [date#3793, name#3794L, value#3795L, min#3800L, cast(date#3793 as bigint) AS _w0#3808L]
+- Window [min(value#3795L) windowspecdefinition(name#3794L, _w0#3801L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -604799, currentrow$())) AS min#3800L], [name#3794L], [_w0#3801L ASC NULLS FIRST]
+- *(2) Sort [name#3794L ASC NULLS FIRST, _w0#3801L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#3794L, 200), true, [id=#812]
+- *(1) Project [date#3793, name#3794L, value#3795L, cast(date#3793 as bigint) AS _w0#3801L]
+- *(1) Scan ExistingRDD[date#3793,name#3794L,value#3795L]
Spark SQL 查询计划:
df.createOrReplaceTempView("df")
sql_example = spark.sql(
"""
SELECT
*
, min(value)
OVER (
PARTITION BY name
ORDER BY CAST(date AS timestamp)
RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
) AS min
, max(value)
OVER (
PARTITION BY name
ORDER BY CAST(date AS timestamp)
RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
) AS max
FROM df"""
)
sql_example.explain()
== Physical Plan ==
*(4) Project [date#3793, name#3794L, value#3795L, min#4670L, max#4671L]
+- Window [max(value#3795L) windowspecdefinition(name#3794L, _w1#4675 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -7 days, currentrow$())) AS max#4671L], [name#3794L], [_w1#4675 ASC NULLS FIRST]
+- *(3) Sort [name#3794L ASC NULLS FIRST, _w1#4675 ASC NULLS FIRST], false, 0
+- *(3) Project [date#3793, name#3794L, value#3795L, _w1#4675, min#4670L]
+- Window [min(value#3795L) windowspecdefinition(name#3794L, _w0#4674 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -7 days, currentrow$())) AS min#4670L], [name#3794L], [_w0#4674 ASC NULLS FIRST]
+- *(2) Sort [name#3794L ASC NULLS FIRST, _w0#4674 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#3794L, 200), true, [id=#955]
+- *(1) Project [date#3793, name#3794L, value#3795L, date#3793 AS _w0#4674, date#3793 AS _w1#4675]
+- *(1) Scan ExistingRDD[date#3793,name#3794L,value#3795L]
聚合函数查询计划:
hof_example.explain()
== Physical Plan ==
Project [date#3793, name#3794L, value#3795L, value_array#5516, aggregate(value_array#5516, 0.0, lambdafunction((lambda acc#5523 + cast(lambda x#5524L as double)), lambda acc#5523, lambda x#5524L, false), lambdafunction(lambda id#5525, lambda id#5525, false)) AS sum_example#5522, aggregate(value_array#5516, 0.0, lambdafunction((lambda acc#5532 + cast(lambda x#5533L as double)), lambda acc#5532, lambda x#5533L, false), lambdafunction((lambda acc#5534 / cast(size(value_array#5516, true) as double)), lambda acc#5534, false)) AS mean_example#5531, array_max(value_array#5516) AS max_example#5541L, array_min(value_array#5516) AS min_example#5549L, aggregate(value_array#5516, 0.0, lambdafunction((lambda acc#5559 + ((cast(lambda x#5560L as double) - aggregate(value_array#5516, 0.0, lambdafunction((lambda acc#5532 + cast(lambda x#5533L as double)), lambda acc#5532, lambda x#5533L, false), lambdafunction((lambda acc#5534 / cast(size(value_array#5516, true) as double)), lambda acc#5534, false))) * (cast(lambda x#5560L as double) - aggregate(value_array#5516, 0.0, lambdafunction((lambda acc#5532 + cast(lambda x#5533L as double)), lambda acc#5532, lambda x#5533L, false), lambdafunction((lambda acc#5534 / cast(size(value_array#5516, true) as double)), lambda acc#5534, false))))), lambda acc#5559, lambda x#5560L, false), lambdafunction(SQRT((lambda acc#5561 / cast((size(value_array#5516, true) - 1) as double))), lambda acc#5561, false)) AS std_example#5558]
+- Window [collect_list(value#3795L, 0, 0) windowspecdefinition(name#3794L, _w0#5517L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -604799, currentrow$())) AS value_array#5516], [name#3794L], [_w0#5517L ASC NULLS FIRST]
+- *(2) Sort [name#3794L ASC NULLS FIRST, _w0#5517L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#3794L, 200), true, [id=#1136]
+- *(1) Project [date#3793, name#3794L, value#3795L, cast(date#3793 as bigint) AS _w0#5517L]
+- *(1) Scan ExistingRDD[date#3793,name#3794L,value#3795L]
注意:我暂时将此标记为已接受的答案。如果有人找到 faster/better 请通知我,我会换它!
编辑说明:此处显示的计算假设输入数据帧已通过天级滚动计算预处理到天级
在我发布问题后,我在我的真实数据集上测试了几个不同的选项(并从同事那里得到了一些输入),我相信最快的方法(对于大型数据集)使用 pyspark.sql.functions.window()
和 groupby().agg
而不是 pyspark.sql.window.Window()
.
可以找到类似的答案
完成这项工作的步骤是:
- 按
name
和date
对数据框进行排序(在示例数据框中) .persist()
数据帧- 使用
F.window()
计算分组数据帧,并为每个 window 需要加入回df
。
查看此操作的 best/easiest 方式在 Spark GUI 事物中的 SQL 图上。当使用 Window()
时,SQL 执行完全是顺序的。但是,当使用 F.window()
时,图表显示并行化!注意:在小型数据集上 Window()
似乎仍然更快。
在我的 7 天真实数据测试中 windows,Window()
比 F.window()
慢 3-5 倍。唯一的缺点是 F.window()
使用起来不太方便。我在下面显示了一些代码和屏幕截图以供参考
找到最快的解决方案(F.window()
和 groupby.agg()
)
# this turned out to be super important for tricking spark into parallelizing things
df = df.orderBy("name", "date")
df.persist()
fwindow7 = F.window(
F.col("date"),
windowDuration="7 days",
slideDuration="1 days",
).alias("window")
gb7 = (
df
.groupBy(F.col("name"), fwindow7)
.agg(
F.sum(F.col("value")).alias("sum7"),
F.avg(F.col("value")).alias("mean7"),
F.min(F.col("value")).alias("min7"),
F.max(F.col("value")).alias("max7"),
F.stddev(F.col("value")).alias("stddev7"),
F.count(F.col("value")).alias("cnt7")
)
.withColumn("date", F.date_sub(F.col("window.end").cast("date"), 1))
.drop("window")
)
window_function_example = df.join(gb7, ["name", "date"], how="left")
fwindow14 = F.window(
F.col("date"),
windowDuration="14 days",
slideDuration="1 days",
).alias("window")
gb14 = (
df
.groupBy(F.col("name"), fwindow14)
.agg(
F.sum(F.col("value")).alias("sum14"),
F.avg(F.col("value")).alias("mean14"),
F.min(F.col("value")).alias("min14"),
F.max(F.col("value")).alias("max14"),
F.stddev(F.col("value")).alias("stddev14"),
F.count(F.col("value")).alias("cnt14")
)
.withColumn("date", F.date_sub(F.col("window.end").cast("date"), 1))
.drop("window")
)
window_function_example = window_function_example.join(gb14, ["name", "date"], how="left")
window_function_example.orderBy("name", "date").show(truncate=True)
SQL 图
原始问题的选项 2(高阶函数应用于 Window()
)
window7 = (
Window
.partitionBy(F.col("name"))
.orderBy(F.col("date").cast("timestamp").cast("long"))
.rangeBetween(-7 * 60 * 60 * 24 + 1, Window.currentRow)
)
window14 = (
Window
.partitionBy(F.col("name"))
.orderBy(F.col("date").cast("timestamp").cast("long"))
.rangeBetween(-14 * 60 * 60 * 24 + 1, Window.currentRow)
)
hof_example = (
df
.withColumn("value_array", F.collect_list(F.col("value")).over(window7))
.withColumn("sum7", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x)'))
.withColumn("mean7", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x, acc -> acc / size(value_array))'))
.withColumn("max7", F.array_max(F.col("value_array")))
.withColumn("min7", F.array_min(F.col("value_array")))
.withColumn("std7", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + (x - mean7)*(x - mean7), acc -> sqrt(acc / (size(value_array) - 1)))'))
.withColumn("count7", F.size(F.col("value_array")))
.drop("value_array")
)
hof_example = (
hof_example
.withColumn("value_array", F.collect_list(F.col("value")).over(window14))
.withColumn("sum14", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x)'))
.withColumn("mean14", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x, acc -> acc / size(value_array))'))
.withColumn("max14", F.array_max(F.col("value_array")))
.withColumn("min14", F.array_min(F.col("value_array")))
.withColumn("std14", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + (x - mean14)*(x - mean14), acc -> sqrt(acc / (size(value_array) - 1)))'))
.withColumn("count14", F.size(F.col("value_array")))
.drop("value_array")
)
hof_example.show(truncate=True)
SQL 图表片段