什么是 spark 中的最佳选择:联合然后加入或加入然后联合?

What is optimal in spark: union then join or join then union?

给定三个不同的数据帧,df1df2,它们具有相同的架构,以及 df3。三个数据帧有一个共同的字段。

还要考虑 df1df2 各有大约 4200 万条记录,df3 有大约 10 万条记录。

spark 中什么是最优的:

老实说,这些数量并不重要。

查看两种方法的 .explain() 并没有太多内容。

A broadcast join 在这两种情况下都很明显。另外 union 不会导致 shuffle,至少你的问题并不意味着,即由于可能导致的转换。

也就是说性能是/应该相等的。见下文,模拟 DF 方法,但对所讨论的要点进行了演示。数学上没有多少其他决定。

方法一

import org.apache.spark.sql.functions.{sha1, rand, col}

val randomDF1 = (spark.range(1, 42000000)
   .withColumn("random_value", rand(seed=10).cast("string"))
   .withColumn("hash", sha1($"random_value"))
   .drop("random_value")
).toDF("id", "hash")

val randomDF2 = (spark.range(1, 42000000)
   .withColumn("random_value", rand(seed=10).cast("string"))
   .withColumn("hash", sha1($"random_value"))
   .drop("random_value")
).toDF("id", "hash")

val randomDF3 = (spark.range(1, 100000)
   .withColumn("random_value", rand(seed=10).cast("string"))
   .withColumn("hash", sha1($"random_value"))
   .drop("random_value")
).toDF("id", "hash")

val u = randomDF1.union(randomDF2) 
 val u2 = u.join(randomDF3, "id").explain()

== Physical Plan ==
*(4) Project [id#25284L, hash#25296, hash#25326]
+- *(4) BroadcastHashJoin [id#25284L], [id#25314L], Inner, BuildRight
   :- Union
   :  :- *(1) Project [id#25284L, sha1(cast(random_value#25286 as binary)) AS hash#25296]
   :  :  +- *(1) Project [id#25284L, cast(rand(10) as string) AS random_value#25286]
   :  :     +- *(1) Range (1, 42000000, step=1, splits=2)
   :  +- *(2) Project [id#25299L, sha1(cast(random_value#25301 as binary)) AS hash#25311]
   :     +- *(2) Project [id#25299L, cast(rand(10) as string) AS random_value#25301]
   :        +- *(2) Range (1, 42000000, step=1, splits=2)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#13264]
   +- *(3) Project [id#25314L, sha1(cast(random_value#25316 as binary)) AS hash#25326]
      +- *(3) Project [id#25314L, cast(rand(10) as string) AS random_value#25316]
         +- *(3) Range (1, 100000, step=1, splits=2)

方法二

import org.apache.spark.sql.functions.{sha1, rand, col}

val randomDF1 = (spark.range(1, 42000000)
  .withColumn("random_value", rand(seed=10).cast("string"))
  .withColumn("hash", sha1($"random_value"))
  .drop("random_value")
).toDF("id", "hash")

val randomDF2 = (spark.range(1, 42000000)
   .withColumn("random_value", rand(seed=10).cast("string"))
   .withColumn("hash", sha1($"random_value"))
   .drop("random_value")
).toDF("id", "hash")

val randomDF3 = (spark.range(1, 100000)
   .withColumn("random_value", rand(seed=10).cast("string"))
   .withColumn("hash", sha1($"random_value"))
   .drop("random_value")
).toDF("id", "hash")

val u1 = randomDF1.join(randomDF3, "id") 
val u2 = randomDF2.join(randomDF3, "id") 
val u3 = u1.union(u2).explain() 

== Physical Plan ==
Union
:- *(2) Project [id#25335L, hash#25347, hash#25377]
:  +- *(2) BroadcastHashJoin [id#25335L], [id#25365L], Inner, BuildRight
:     :- *(2) Project [id#25335L, sha1(cast(random_value#25337 as binary)) AS hash#25347]
:     :  +- *(2) Project [id#25335L, cast(rand(10) as string) AS random_value#25337]
:     :     +- *(2) Range (1, 42000000, step=1, splits=2)
:     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#13409]
:        +- *(1) Project [id#25365L, sha1(cast(random_value#25367 as binary)) AS hash#25377]
:           +- *(1) Project [id#25365L, cast(rand(10) as string) AS random_value#25367]
:              +- *(1) Range (1, 100000, step=1, splits=2)
+- *(4) Project [id#25350L, hash#25362, hash#25377]
   +- *(4) BroadcastHashJoin [id#25350L], [id#25365L], Inner, BuildRight
      :- *(4) Project [id#25350L, sha1(cast(random_value#25352 as binary)) AS hash#25362]
      :  +- *(4) Project [id#25350L, cast(rand(10) as string) AS random_value#25352]
      :     +- *(4) Range (1, 42000000, step=1, splits=2)
  +- ReusedExchange [id#25365L, hash#25377], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#13409]