我怎样才能有效地将一个大的 rdd 加入到一个非常大的 rdd 中?
How can I efficiently join a large rdd to a very large rdd in spark?
我有两个 RDD。一个 RDD 在 5-10 百万个条目之间,另一个 RDD 在 5 亿到 7.5 亿个条目之间。在某些时候,我必须使用公共密钥加入这两个 rdds。
val rddA = someData.rdd.map { x => (x.key, x); } // 10-million
val rddB = someData.rdd.map { y => (y.key, y); } // 600-million
var joinRDD = rddA.join(rddB);
当 spark 决定执行此连接时,它决定执行 ShuffledHashJoin。这导致 rddB 中的许多项目在网络上被打乱。同样,一些 rddA 也在网络上进行了洗牌。在这种情况下,rddA 太 "big" 无法用作广播变量,但似乎 BroadcastHashJoin 会更有效。是否提示要使用 BroadcastHashJoin? (Apache Flink 通过连接提示支持这一点)。
如果不是,增加 autoBroadcastJoinThreshold 是唯一的选择吗?
7 月 14 日更新
我的性能问题似乎与重新分区有关。通常,从 HDFS 读取的 RDD 将按块进行分区,但在这种情况下,源是 [我制作的] 镶木地板数据源。 spark(databricks)写parquet文件时,一个分区写一个文件,同理,一个文件读一个分区。因此,我发现的最佳答案是在数据源的生产过程中,通过键对其进行分区,然后写出 parquet sink(然后自然地共同分区)并将其用作 rddB。
给出的答案是正确的,但我认为有关 parquet 数据源的详细信息可能对其他人有用。
您可以使用相同的分区程序对 RDD 进行分区,在这种情况下,具有相同键的分区将并置在相同的执行程序上。
在这种情况下,您将避免 join 操作的随机播放。
Shuffle 只会发生一次,当你更新 parititoner 时,如果你将缓存 RDD 的所有连接,那之后应该对执行者来说是本地的
import org.apache.spark.SparkContext._
class A
class B
val rddA: RDD[(String, A)] = ???
val rddB: RDD[(String, B)] = ???
val partitioner = new HashPartitioner(1000)
rddA.partitionBy(partitioner).cache()
rddB.partitionBy(partitioner).cache()
您也可以尝试更新广播阈值大小,也许 rddA 可以广播:
--conf spark.sql.autoBroadcastJoinThreshold=300000000 # ~300 mb
我们使用 400mb 进行广播连接,效果很好。
我有两个 RDD。一个 RDD 在 5-10 百万个条目之间,另一个 RDD 在 5 亿到 7.5 亿个条目之间。在某些时候,我必须使用公共密钥加入这两个 rdds。
val rddA = someData.rdd.map { x => (x.key, x); } // 10-million
val rddB = someData.rdd.map { y => (y.key, y); } // 600-million
var joinRDD = rddA.join(rddB);
当 spark 决定执行此连接时,它决定执行 ShuffledHashJoin。这导致 rddB 中的许多项目在网络上被打乱。同样,一些 rddA 也在网络上进行了洗牌。在这种情况下,rddA 太 "big" 无法用作广播变量,但似乎 BroadcastHashJoin 会更有效。是否提示要使用 BroadcastHashJoin? (Apache Flink 通过连接提示支持这一点)。
如果不是,增加 autoBroadcastJoinThreshold 是唯一的选择吗?
7 月 14 日更新
我的性能问题似乎与重新分区有关。通常,从 HDFS 读取的 RDD 将按块进行分区,但在这种情况下,源是 [我制作的] 镶木地板数据源。 spark(databricks)写parquet文件时,一个分区写一个文件,同理,一个文件读一个分区。因此,我发现的最佳答案是在数据源的生产过程中,通过键对其进行分区,然后写出 parquet sink(然后自然地共同分区)并将其用作 rddB。
给出的答案是正确的,但我认为有关 parquet 数据源的详细信息可能对其他人有用。
您可以使用相同的分区程序对 RDD 进行分区,在这种情况下,具有相同键的分区将并置在相同的执行程序上。
在这种情况下,您将避免 join 操作的随机播放。
Shuffle 只会发生一次,当你更新 parititoner 时,如果你将缓存 RDD 的所有连接,那之后应该对执行者来说是本地的
import org.apache.spark.SparkContext._
class A
class B
val rddA: RDD[(String, A)] = ???
val rddB: RDD[(String, B)] = ???
val partitioner = new HashPartitioner(1000)
rddA.partitionBy(partitioner).cache()
rddB.partitionBy(partitioner).cache()
您也可以尝试更新广播阈值大小,也许 rddA 可以广播:
--conf spark.sql.autoBroadcastJoinThreshold=300000000 # ~300 mb
我们使用 400mb 进行广播连接,效果很好。