避免 Spark window 函数中单一分区模式的性能影响
Avoid performance impact of a single partition mode in Spark window functions
我的问题是由计算 spark 数据帧中连续行之间的差异的用例引发的。
比如我有:
>>> df.show()
+-----+----------+
|index| col1|
+-----+----------+
| 0.0|0.58734024|
| 1.0|0.67304325|
| 2.0|0.85154736|
| 3.0| 0.5449719|
+-----+----------+
如果我选择使用 "Window" 函数来计算这些,那么我可以这样做:
>>> winSpec = Window.partitionBy(df.index >= 0).orderBy(df.index.asc())
>>> import pyspark.sql.functions as f
>>> df.withColumn('diffs_col1', f.lag(df.col1, -1).over(winSpec) - df.col1).show()
+-----+----------+-----------+
|index| col1| diffs_col1|
+-----+----------+-----------+
| 0.0|0.58734024|0.085703015|
| 1.0|0.67304325| 0.17850411|
| 2.0|0.85154736|-0.30657548|
| 3.0| 0.5449719| null|
+-----+----------+-----------+
问题:我明确地将数据帧划分为一个分区。这对性能有什么影响,如果有,为什么会这样,我该如何避免呢?因为当我不指定分区时,我得到以下警告:
16/12/24 13:52:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
在实践中,性能影响几乎与完全省略 partitionBy
子句一样。所有记录将被洗牌到一个分区,在本地排序并逐一迭代。
区别仅在于创建的分区总数。让我们通过一个使用具有 10 个分区和 1000 条记录的简单数据集的示例来说明:
df = spark.range(0, 1000, 1, 10).toDF("index").withColumn("col1", f.randn(42))
如果您定义没有分区的框架
w_unpart = Window.orderBy(f.col("index").asc())
并与lag
一起使用
df_lag_unpart = df.withColumn(
"diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1")
)
总共只有一个分区:
df_lag_unpart.rdd.glom().map(len).collect()
[1000]
与带有虚拟索引的框架定义相比(与您的代码相比简化了一点:
w_part = Window.partitionBy(f.lit(0)).orderBy(f.col("index").asc())
将使用等于 spark.sql.shuffle.partitions
:
的分区数
spark.conf.set("spark.sql.shuffle.partitions", 11)
df_lag_part = df.withColumn(
"diffs_col1", f.lag("col1", 1).over(w_part) - f.col("col1")
)
df_lag_part.rdd.glom().count()
11
只有一个非空分区:
df_lag_part.rdd.glom().filter(lambda x: x).count()
1
遗憾的是,在 PySpark 中没有可用于解决此问题的通用解决方案。这只是结合分布式处理模型实现的固有机制。
由于 index
列是连续的,您可以生成每个块具有固定记录数的人工分区键:
rec_per_block = df.count() // int(spark.conf.get("spark.sql.shuffle.partitions"))
df_with_block = df.withColumn(
"block", (f.col("index") / rec_per_block).cast("int")
)
并用它来定义框架规格:
w_with_block = Window.partitionBy("block").orderBy("index")
df_lag_with_block = df_with_block.withColumn(
"diffs_col1", f.lag("col1", 1).over(w_with_block) - f.col("col1")
)
这将使用预期的分区数:
df_lag_with_block.rdd.glom().count()
11
具有大致均匀的数据分布(我们无法避免哈希冲突):
df_lag_with_block.rdd.glom().map(len).collect()
[0, 180, 0, 90, 90, 0, 90, 90, 100, 90, 270]
但在块边界上有许多间隙:
df_lag_with_block.where(f.col("diffs_col1").isNull()).count()
12
由于边界很容易计算:
from itertools import chain
boundary_idxs = sorted(chain.from_iterable(
# Here we depend on sequential identifiers
# This could be generalized to any monotonically increasing
# id by taking min and max per block
(idx - 1, idx) for idx in
df_lag_with_block.groupBy("block").min("index")
.drop("block").rdd.flatMap(lambda x: x)
.collect()))[2:] # The first boundary doesn't carry useful inf.
你总是可以 select:
missing = df_with_block.where(f.col("index").isin(boundary_idxs))
并分别填写:
# We use window without partitions here. Since number of records
# will be small this won't be a performance issue
# but will generate "Moving all data to a single partition" warning
missing_with_lag = missing.withColumn(
"diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1")
).select("index", f.col("diffs_col1").alias("diffs_fill"))
和join
:
combined = (df_lag_with_block
.join(missing_with_lag, ["index"], "leftouter")
.withColumn("diffs_col1", f.coalesce("diffs_col1", "diffs_fill")))
获得想要的结果:
mismatched = combined.join(df_lag_unpart, ["index"], "outer").where(
combined["diffs_col1"] != df_lag_unpart["diffs_col1"]
)
assert mismatched.count() == 0
我的问题是由计算 spark 数据帧中连续行之间的差异的用例引发的。
比如我有:
>>> df.show()
+-----+----------+
|index| col1|
+-----+----------+
| 0.0|0.58734024|
| 1.0|0.67304325|
| 2.0|0.85154736|
| 3.0| 0.5449719|
+-----+----------+
如果我选择使用 "Window" 函数来计算这些,那么我可以这样做:
>>> winSpec = Window.partitionBy(df.index >= 0).orderBy(df.index.asc())
>>> import pyspark.sql.functions as f
>>> df.withColumn('diffs_col1', f.lag(df.col1, -1).over(winSpec) - df.col1).show()
+-----+----------+-----------+
|index| col1| diffs_col1|
+-----+----------+-----------+
| 0.0|0.58734024|0.085703015|
| 1.0|0.67304325| 0.17850411|
| 2.0|0.85154736|-0.30657548|
| 3.0| 0.5449719| null|
+-----+----------+-----------+
问题:我明确地将数据帧划分为一个分区。这对性能有什么影响,如果有,为什么会这样,我该如何避免呢?因为当我不指定分区时,我得到以下警告:
16/12/24 13:52:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
在实践中,性能影响几乎与完全省略 partitionBy
子句一样。所有记录将被洗牌到一个分区,在本地排序并逐一迭代。
区别仅在于创建的分区总数。让我们通过一个使用具有 10 个分区和 1000 条记录的简单数据集的示例来说明:
df = spark.range(0, 1000, 1, 10).toDF("index").withColumn("col1", f.randn(42))
如果您定义没有分区的框架
w_unpart = Window.orderBy(f.col("index").asc())
并与lag
df_lag_unpart = df.withColumn(
"diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1")
)
总共只有一个分区:
df_lag_unpart.rdd.glom().map(len).collect()
[1000]
与带有虚拟索引的框架定义相比(与您的代码相比简化了一点:
w_part = Window.partitionBy(f.lit(0)).orderBy(f.col("index").asc())
将使用等于 spark.sql.shuffle.partitions
:
spark.conf.set("spark.sql.shuffle.partitions", 11)
df_lag_part = df.withColumn(
"diffs_col1", f.lag("col1", 1).over(w_part) - f.col("col1")
)
df_lag_part.rdd.glom().count()
11
只有一个非空分区:
df_lag_part.rdd.glom().filter(lambda x: x).count()
1
遗憾的是,在 PySpark 中没有可用于解决此问题的通用解决方案。这只是结合分布式处理模型实现的固有机制。
由于 index
列是连续的,您可以生成每个块具有固定记录数的人工分区键:
rec_per_block = df.count() // int(spark.conf.get("spark.sql.shuffle.partitions"))
df_with_block = df.withColumn(
"block", (f.col("index") / rec_per_block).cast("int")
)
并用它来定义框架规格:
w_with_block = Window.partitionBy("block").orderBy("index")
df_lag_with_block = df_with_block.withColumn(
"diffs_col1", f.lag("col1", 1).over(w_with_block) - f.col("col1")
)
这将使用预期的分区数:
df_lag_with_block.rdd.glom().count()
11
具有大致均匀的数据分布(我们无法避免哈希冲突):
df_lag_with_block.rdd.glom().map(len).collect()
[0, 180, 0, 90, 90, 0, 90, 90, 100, 90, 270]
但在块边界上有许多间隙:
df_lag_with_block.where(f.col("diffs_col1").isNull()).count()
12
由于边界很容易计算:
from itertools import chain
boundary_idxs = sorted(chain.from_iterable(
# Here we depend on sequential identifiers
# This could be generalized to any monotonically increasing
# id by taking min and max per block
(idx - 1, idx) for idx in
df_lag_with_block.groupBy("block").min("index")
.drop("block").rdd.flatMap(lambda x: x)
.collect()))[2:] # The first boundary doesn't carry useful inf.
你总是可以 select:
missing = df_with_block.where(f.col("index").isin(boundary_idxs))
并分别填写:
# We use window without partitions here. Since number of records
# will be small this won't be a performance issue
# but will generate "Moving all data to a single partition" warning
missing_with_lag = missing.withColumn(
"diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1")
).select("index", f.col("diffs_col1").alias("diffs_fill"))
和join
:
combined = (df_lag_with_block
.join(missing_with_lag, ["index"], "leftouter")
.withColumn("diffs_col1", f.coalesce("diffs_col1", "diffs_fill")))
获得想要的结果:
mismatched = combined.join(df_lag_unpart, ["index"], "outer").where(
combined["diffs_col1"] != df_lag_unpart["diffs_col1"]
)
assert mismatched.count() == 0