使用 Scala 和 Python API 联合 Spark 数据帧时的不同分区号

Different partition number when union Spark dataframes with Scala and Python API

我正在检查 2 个 相同 Spark 数据帧的并集的分区数,我注意到 Scala 和 Pyhton 的结果不一样 API.

Python 联合的分区数是 2 个数据帧的分区数之和,这是预期的行为。

Python

from pyspark.sql.types import IntegerType

df1 = spark.createDataFrame(range(100000), IntegerType()).repartition(10)
print("df1 partitions: %d" %df1.rdd.getNumPartitions())

df2 = spark.createDataFrame(range(100000), IntegerType()).repartition(10)
print("df2 partitions: %d" %df2.rdd.getNumPartitions())

df3 = df1.union(df2)
print("df3 partitions: %d" %df3.rdd.getNumPartitions())

结果:

df1 partitions: 10
df2 partitions: 10
df3 partitions: 20

但是,对于 scala,联合的分区数不会改变。

Scala

val df1 = (1 to 100000).toDF.repartition(10)
println(s"df1 partitions: ${df1.rdd.getNumPartitions}")

val df2 = (1 to 100000 by 1).toDF.repartition(10)
println(s"df2 partitions: ${df2.rdd.getNumPartitions}")

val df3 = df1.union(df2)
println(s"df3 partitions: ${df3.rdd.getNumPartitions}")

结果:

df1 partitions: 10
df2 partitions: 10
df3 partitions: 10

只有当 2 个数据帧以相同的方式完全构建时才会出现这种情况。

什么时候不是:

val df1 = (1 to 100000).toDF.repartition(10)
println(s"df1 partitions: ${df1.rdd.getNumPartitions}")

val df2 = (1 to 100000 by 2).toDF.repartition(10)
println(s"df2 partitions: ${df2.rdd.getNumPartitions}")

val df3 = df1.union(df2)
println(s"df3 partitions: ${df3.rdd.getNumPartitions}")

我得到了预期的结果(总和):

df1 partitions: 10
df2 partitions: 10
df3 partitions: 20

我的理解是,使用 Scala API Spark 在某些情况下能够优化联合。这是真实的 ?这意味着联合的执行计划在 Scala 和 Python API ?

之间可能不同

我问这个问题是因为我注意到 Scala 的联合比 Python 更高效,尤其是在多个联合的情况下。

spark 中 union 的定义 - scala

def union(other: Dataset[T]): Dataset[T] = withSetOperator {
    // This breaks caching, but it's usually ok because it addresses a very specific use case:
    // using union to union many files or partitions.
    CombineUnions(Union(logicalPlan, other.logicalPlan))
  }

pyspark 中联合的定义

def union(self, other):
        # Return a new :class:`DataFrame` containing union of rows in this and #another
        #:class:`DataFrame`.
        #This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union
        #(that does deduplication of elements), use this function followed by #:func:`distinct`.
        #Also as standard in SQL, this function resolves columns by position (not #by name).

        return DataFrame(self._jdf.union(other._jdf), self.sql_ctx)

参考这里的代码来理解区别 https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

https://github.com/apache/spark/blob/master/python/pyspark/sql/dataframe.py

有什么发现会继续更新

Observation1 -- scala 和 python

物理计划有差异
union physical plan pyspark 
:- Exchange RoundRobinPartitioning(10), [id=#1318]
:  +- *(1) Scan ExistingRDD[value#148]
+- Exchange RoundRobinPartitioning(10), [id=#1320]
   +- *(2) Scan ExistingRDD[value#154]



== Physical Plan scala  ==
Union
:- Exchange RoundRobinPartitioning(10), [id=#1012]
:  +- LocalTableScan [value#122]
+- ReusedExchange [value#131], Exchange RoundRobinPartitioning(10), [id=#1012]


 scala  Range (1 to 10 by 2) == Physical Plan ==
 val df2 = (1 to 10 by 2).toDF.repartition(10)
Union
:- Exchange RoundRobinPartitioning(10), [id=#1644]
:  +- LocalTableScan [value#184]
+- Exchange RoundRobinPartitioning(10), [id=#1646]
   +- LocalTableScan [value#193]

观察 2 -- spark 中的联合不会引发洗牌操作,它非常有效 operation.I 相信它是 df1 和 df2 的显式重新分区导致联合 df3 的分区数发生变化。如果您没有显式地对您的输入 Dataframes 进行分区,您最终会得到一个联合 df,其分区号等于 df1 和 df2 的总和。我尝试对相同数据进行排列,并在

下得到结果

案例 1

from pyspark.sql.types import IntegerType
  df1 = spark.createDataFrame(range(100000), IntegerType())
  print("df1 partitions: %d" %df1.rdd.getNumPartitions())
  print("df1 partitioner: %s" %df1.rdd.partitioner)
  df2 = spark.createDataFrame(range(100000), IntegerType())
  print("df2 partitions: %d" %df2.rdd.getNumPartitions())
  print("df2 partitioner: %s" %df2.rdd.partitioner)
  df3 = df1.union(df2)
  print("df3 partitions: %d" %df3.rdd.getNumPartitions())
  print("df3 partitioner: %s" %df3.rdd.partitioner)

******O/P*********

df1 partitions: 8
df1 partitioner: None
df2 partitions: 8
df2 partitioner: None

df3 partitions: 16
df3 partitioner: None

案例 2

val df1 = (1 to 100000).toDF
println(s"df1 partitions: ${df1.rdd.getNumPartitions}")
println(s"df1 partitioner: ${df1.rdd.partitioner}")
val df2 = (1 to 100000).toDF

println(s"df2 partitions: ${df2.rdd.getNumPartitions}")
println(s"df2 partitioner: ${df2.rdd.partitioner}")
 df1.union(df2).explain()
val df3 = df1.union(df2)

println(s"df3 partitions: ${df3.rdd.getNumPartitions}")
println(s"df3 partitioner: ${df3.rdd.partitioner}")

******O/P*********

df1 partitions: 8
df1 partitioner: None
df2 partitions: 8
df2 partitioner: None
df3 partitions: 16
df3 partitioner: None

案例 3

val df1 = (1 to 100000).toDF
println(s"df1 partitions: ${df1.rdd.getNumPartitions}")
println(s"df1 partitioner: ${df1.rdd.partitioner}")
val df2 = (1 to 100000 by 2).toDF
println(s"df2 partitions: ${df2.rdd.getNumPartitions}")
println(s"df2 partitioner: ${df2.rdd.partitioner}")
val df3 = df1.union(df2)
println(s"df3 partitions: ${df3.rdd.getNumPartitions}")
println(s"df3 partitioner: ${df3.rdd.partitioner}")

****O/P****

df1 partitions: 8
df1 partitioner: None
df2 partitions: 8
df2 partitioner: None
df3 partitions: 16
df3 partitioner: None

线索来自 Scala 引擎的解释:

Union
:- Exchange RoundRobinPartitioning(10), [id=#757]
:  +- LocalTableScan [value#154]
+- ReusedExchange [value#159], Exchange RoundRobinPartitioning(10), [id=#757]

ReusedExchange 是一种优化形式。 Catalyst 认为它们是相同的。

如果你有一个有 10000 个条目,一个有 10001 个条目,那么得到 20 个分区。 Spark 有点聪明。