为什么我的构建挂起/需要很长时间才能生成包含多个联合的查询计划?

Why is my build hanging / taking a long time to generate my query plan with many unions?

我注意到当我 运行 与我在 上的示例相同的代码但是使用 unionunionByNameunionAll 而不是 join,我的查询计划花费了 显着 的时间,并且可能导致驱动程序 OOM。

此处包含的代码仅供参考,与 for() 循环中发生的内容略有不同。

from pyspark.sql import types as T, functions as F, SparkSession
spark = SparkSession.builder.getOrCreate()

schema = T.StructType([
  T.StructField("col_1", T.IntegerType(), False),
  T.StructField("col_2", T.IntegerType(), False),
  T.StructField("measure_1", T.FloatType(), False),
  T.StructField("measure_2", T.FloatType(), False),
])
data = [
  {"col_1": 1, "col_2": 2, "measure_1": 0.5, "measure_2": 1.5},
  {"col_1": 2, "col_2": 3, "measure_1": 2.5, "measure_2": 3.5}
]

df = spark.createDataFrame(data, schema)

right_schema = T.StructType([
  T.StructField("col_1", T.IntegerType(), False)
])
right_data = [
  {"col_1": 1},
  {"col_1": 1},
  {"col_1": 2},
  {"col_1": 2}
]
right_df = spark.createDataFrame(right_data, right_schema)

df = df.unionByName(df)
df = df.join(right_df, on="col_1")
df.show()

"""
+-----+-----+---------+---------+
|col_1|col_2|measure_1|measure_2|
+-----+-----+---------+---------+
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
+-----+-----+---------+---------+
"""

df.explain()

"""
== Physical Plan ==
*(6) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803]
+- *(6) SortMergeJoin [col_1#1800], [col_1#1808], Inner
   :- *(3) Sort [col_1#1800 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#5454]
   :     +- Union
   :        :- *(1) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
   :        +- *(2) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
   +- *(5) Sort [col_1#1808 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#5460]
         +- *(4) Scan ExistingRDD[col_1#1808]
"""

filter_union_cols = ["col_1", "measure_1", "col_2", "measure_2"]
df = df.withColumn("found_filter", F.lit(None))
for filter_col in filter_union_cols:
  stats = df.filter(F.col(filter_col) < F.lit(1)).drop("found_filter")
  df = df.unionByName(
    stats.select(
      "*",
      F.lit(filter_col).alias("found_filter")
    )
  )

df.show()

"""
+-----+-----+---------+---------+------------+                                  
|col_1|col_2|measure_1|measure_2|found_filter|
+-----+-----+---------+---------+------------+
|    1|    2|      0.5|      1.5|        null|
|    1|    2|      0.5|      1.5|        null|
|    1|    2|      0.5|      1.5|        null|
|    1|    2|      0.5|      1.5|        null|
|    2|    3|      2.5|      3.5|        null|
|    2|    3|      2.5|      3.5|        null|
|    2|    3|      2.5|      3.5|        null|
|    2|    3|      2.5|      3.5|        null|
|    1|    2|      0.5|      1.5|   measure_1|
|    1|    2|      0.5|      1.5|   measure_1|
|    1|    2|      0.5|      1.5|   measure_1|
|    1|    2|      0.5|      1.5|   measure_1|
+-----+-----+---------+---------+------------+
"""

df.explain()

# REALLY long query plan.....

"""
== Physical Plan ==
Union
:- *(6) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, null AS found_filter#1855]
:  +- *(6) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(3) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7637]
:     :     +- Union
:     :        :- *(1) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(2) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(5) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:           +- *(4) Scan ExistingRDD[col_1#1808]
:- *(12) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_1 AS found_filter#1860]
:  +- *(12) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(9) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7654]
:     :     +- Union
:     :        :- *(7) Filter (col_1#1800 < 1)
:     :        :  +- *(7) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(8) Filter (col_1#1800 < 1)
:     :           +- *(8) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(11) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:           +- *(10) Filter (col_1#1808 < 1)
:              +- *(10) Scan ExistingRDD[col_1#1808]
:- *(18) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_1 AS found_filter#1880]
:  +- *(18) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(15) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7671]
:     :     +- Union
:     :        :- *(13) Filter (measure_1#1802 < 1.0)
:     :        :  +- *(13) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(14) Filter (measure_1#1802 < 1.0)
:     :           +- *(14) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(17) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(24) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_1 AS found_filter#2022]
:  +- *(24) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(21) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7688]
:     :     +- Union
:     :        :- *(19) Filter ((col_1#1800 < 1) AND (measure_1#1802 < 1.0))
:     :        :  +- *(19) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(20) Filter ((col_1#1800 < 1) AND (measure_1#1802 < 1.0))
:     :           +- *(20) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(23) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(30) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#1900]
:  +- *(30) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(27) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7705]
:     :     +- Union
:     :        :- *(25) Filter (col_2#1801 < 1)
:     :        :  +- *(25) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(26) Filter (col_2#1801 < 1)
:     :           +- *(26) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(29) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(36) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#2023]
:  +- *(36) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(33) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7722]
:     :     +- Union
:     :        :- *(31) Filter ((col_1#1800 < 1) AND (col_2#1801 < 1))
:     :        :  +- *(31) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(32) Filter ((col_1#1800 < 1) AND (col_2#1801 < 1))
:     :           +- *(32) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(35) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(42) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#2024]
:  +- *(42) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(39) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7739]
:     :     +- Union
:     :        :- *(37) Filter ((measure_1#1802 < 1.0) AND (col_2#1801 < 1))
:     :        :  +- *(37) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(38) Filter ((measure_1#1802 < 1.0) AND (col_2#1801 < 1))
:     :           +- *(38) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(41) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(48) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#2028]
:  +- *(48) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(45) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7756]
:     :     +- Union
:     :        :- *(43) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1))
:     :        :  +- *(43) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(44) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1))
:     :           +- *(44) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(47) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(54) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#1920]
:  +- *(54) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(51) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7773]
:     :     +- Union
:     :        :- *(49) Filter (measure_2#1803 < 1.0)
:     :        :  +- *(49) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(50) Filter (measure_2#1803 < 1.0)
:     :           +- *(50) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(53) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(60) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2025]
:  +- *(60) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(57) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7790]
:     :     +- Union
:     :        :- *(55) Filter ((col_1#1800 < 1) AND (measure_2#1803 < 1.0))
:     :        :  +- *(55) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(56) Filter ((col_1#1800 < 1) AND (measure_2#1803 < 1.0))
:     :           +- *(56) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(59) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(66) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2026]
:  +- *(66) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(63) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7807]
:     :     +- Union
:     :        :- *(61) Filter ((measure_1#1802 < 1.0) AND (measure_2#1803 < 1.0))
:     :        :  +- *(61) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(62) Filter ((measure_1#1802 < 1.0) AND (measure_2#1803 < 1.0))
:     :           +- *(62) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(65) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(72) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2029]
:  +- *(72) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(69) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7824]
:     :     +- Union
:     :        :- *(67) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (measure_2#1803 < 1.0))
:     :        :  +- *(67) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(68) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (measure_2#1803 < 1.0))
:     :           +- *(68) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(71) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(78) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2027]
:  +- *(78) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(75) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7841]
:     :     +- Union
:     :        :- *(73) Filter ((col_2#1801 < 1) AND (measure_2#1803 < 1.0))
:     :        :  +- *(73) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(74) Filter ((col_2#1801 < 1) AND (measure_2#1803 < 1.0))
:     :           +- *(74) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(77) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(84) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2030]
:  +- *(84) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(81) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7858]
:     :     +- Union
:     :        :- *(79) Filter (((col_1#1800 < 1) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
:     :        :  +- *(79) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(80) Filter (((col_1#1800 < 1) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
:     :           +- *(80) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(83) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(90) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2031]
:  +- *(90) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(87) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7875]
:     :     +- Union
:     :        :- *(85) Filter (((measure_1#1802 < 1.0) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
:     :        :  +- *(85) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(86) Filter (((measure_1#1802 < 1.0) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
:     :           +- *(86) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(89) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
+- *(96) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2032]
   +- *(96) SortMergeJoin [col_1#1800], [col_1#1808], Inner
      :- *(93) Sort [col_1#1800 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7892]
      :     +- Union
      :        :- *(91) Filter ((((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
      :        :  +- *(91) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
      :        +- *(92) Filter ((((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
      :           +- *(92) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
      +- *(95) Sort [col_1#1808 ASC NULLS FIRST], false, 0
         +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
"""

我在这里看到一个明显更长的查询计划,尤其是随着 for() 循环迭代次数的增加,性能严重下降。

我怎样才能提高我的表现?

这是 Spark 中迭代算法的一个已知限制。目前,循环的每次迭代都会导致内部节点被重新计算并堆叠在外部 df 变量上。

这意味着您的查询计划过程采用 O(exp(n)),其中 n 是循环的迭代次数。

Palantir Foundry 中有一个名为 Transforms Verbs 的工具可以帮助解决这个问题。

只需导入 transforms.verbs.dataframes.union_many 并在您希望具体化的所有数据帧上调用它(假设您的逻辑允许这样做,即循环的一次迭代不依赖于先前的结果循环的迭代。

上面的代码应该修改为:

from pyspark.sql import types as T, functions as F, SparkSession
from transforms.verbs.dataframes import union_many

spark = SparkSession.builder.getOrCreate()

schema = T.StructType([
  T.StructField("col_1", T.IntegerType(), False),
  T.StructField("col_2", T.IntegerType(), False),
  T.StructField("measure_1", T.FloatType(), False),
  T.StructField("measure_2", T.FloatType(), False),
])
data = [
  {"col_1": 1, "col_2": 2, "measure_1": 0.5, "measure_2": 1.5},
  {"col_1": 2, "col_2": 3, "measure_1": 2.5, "measure_2": 3.5}
]

df = spark.createDataFrame(data, schema)

right_schema = T.StructType([
  T.StructField("col_1", T.IntegerType(), False)
])
right_data = [
  {"col_1": 1},
  {"col_1": 1},
  {"col_1": 2},
  {"col_1": 2}
]
right_df = spark.createDataFrame(right_data, right_schema)

df = df.unionByName(df)
df = df.join(right_df, on="col_1")
df.show()

"""
+-----+-----+---------+---------+
|col_1|col_2|measure_1|measure_2|
+-----+-----+---------+---------+
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
+-----+-----+---------+---------+
"""

df.explain()

"""
== Physical Plan ==
*(6) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803]
+- *(6) SortMergeJoin [col_1#1800], [col_1#1808], Inner
   :- *(3) Sort [col_1#1800 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#5454]
   :     +- Union
   :        :- *(1) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
   :        +- *(2) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
   +- *(5) Sort [col_1#1808 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#5460]
         +- *(4) Scan ExistingRDD[col_1#1808]
"""

filter_union_cols = ["col_1", "measure_1", "col_2", "measure_2"]
df = df.withColumn("found_filter", F.lit(None))
union_dfs = []
for filter_col in filter_union_cols:
  stats = df.filter(F.col(filter_col) < F.lit(1)).drop("found_filter")
  union_df = stats.select(
    "*",
    F.lit(filter_col).alias("found_filter")
  )
  union_dfs += [union_df]

df = df.unionByName(
  union_many(union_dfs)
)

这将优化您的联合并显着减少时间。

底线:注意在 for/while 循环中使用任何 union 调用。如果您必须使用此行为,请使用 transforms.verbs.dataframes.union_many 动词来优化您的最终数据帧集

查看您的平台文档以获取更多信息和更有用的动词。

提示:使用包含的优化 进一步提高性能

如果您收到关于列表没有 unionByName/union_many 属性的错误,在尝试实施 op 的解决方案时,请尝试解压最后一行带有星号前缀的 union_dfs,如下所示:

df = df.unionByName(
    union_many(*union_dfs)
)

显然,Foundry 不会自行解压缩数据帧引用列表。