我可以在sparkConf中为不同的sql设置不同的autoBroadcastJoinThreshold值吗?
Can I set different autoBroadcastJoinThreshold value in sparkConf for different sql?
我有大数据框:A(200g), B(20m), C(15m), D(10m), E(12m), 我想加入他们在一起:A 加入 B,C 在同一个 SparkSession 中使用 spark sql 加入 D 和 E**。就像:
absql:sql("select * from A a inner join B b on a.id=b.id").write.csv("/path/for/ab")
cdesql:sql("select * from C c inner join D d on c.id=d.id inner join E e on c.id=e.id").write.csv("/path/for/cde")
问题:
当我使用默认值时spark.sql.autoBroadcastJoinThreshold=10m
- absql会花很长时间,原因是absql偏斜。
- cdesql正常
当我设置spark.sql.autoBroadcastJoinThreshold=20m
- C,D,E会被广播,所有的任务都会在同一个executor中执行,还是需要很长时间。
- 如果设置num-executors=200,则广播时间较长
- absql正常
您可以标记要广播的数据帧,而不是更改 autoBroadcastJoinThreshold
。这样,很容易决定哪些数据帧应该广播或不广播。
在 Scala 中它看起来像这样:
import org.apache.spark.sql.functions.broadcast
val B2 = broadcast(B)
B2.createOrReplaceTempView("B")
此处数据帧 B 已被标记为广播,然后被注册为 table 以与 Spark SQL 一起使用。
或者,这可以直接用数据帧完成 API,第一个连接可以写成:
A.join(broadcast(B), Seq("id"), "inner")
我有大数据框:A(200g), B(20m), C(15m), D(10m), E(12m), 我想加入他们在一起:A 加入 B,C 在同一个 SparkSession 中使用 spark sql 加入 D 和 E**。就像:
absql:sql("select * from A a inner join B b on a.id=b.id").write.csv("/path/for/ab")
cdesql:sql("select * from C c inner join D d on c.id=d.id inner join E e on c.id=e.id").write.csv("/path/for/cde")
问题:
当我使用默认值时spark.sql.autoBroadcastJoinThreshold=10m
- absql会花很长时间,原因是absql偏斜。
- cdesql正常
当我设置spark.sql.autoBroadcastJoinThreshold=20m
- C,D,E会被广播,所有的任务都会在同一个executor中执行,还是需要很长时间。
- 如果设置num-executors=200,则广播时间较长
- absql正常
您可以标记要广播的数据帧,而不是更改 autoBroadcastJoinThreshold
。这样,很容易决定哪些数据帧应该广播或不广播。
在 Scala 中它看起来像这样:
import org.apache.spark.sql.functions.broadcast
val B2 = broadcast(B)
B2.createOrReplaceTempView("B")
此处数据帧 B 已被标记为广播,然后被注册为 table 以与 Spark SQL 一起使用。
或者,这可以直接用数据帧完成 API,第一个连接可以写成:
A.join(broadcast(B), Seq("id"), "inner")