如何在加入 Spark 之前正确应用 HashPartitioner?

How to properly apply HashPartitioner before a join in Spark?

为了减少两个RDD连接时的shuffling,我决定先使用HashPartitioner对它们进行分区。我是这样做的。我这样做是否正确,或者有更好的方法吗?

val rddA = ...
val rddB = ...

val numOfPartitions = rddA.getNumPartitions

val rddApartitioned = rddA.partitionBy(new HashPartitioner(numOfPartitions))
val rddBpartitioned = rddB.partitionBy(new HashPartitioner(numOfPartitions))

val rddAB = rddApartitioned.join(rddBpartitioned)

To reduce shuffling during the joining of two RDDs,

令人惊讶的是,重新分区会减少甚至消除洗牌是一种常见的误解。 没有。重新分区 是 shuffle 最纯粹的形式。它不会节省时间、带宽或内存。

使用主动分区程序背后的基本原理是不同的 - 它允许您洗牌一次,并重用状态,执行多个按键操作,而无需额外的洗牌(尽管作为据我所知,不一定没有额外的网络流量,,不包括在单个操作中发生随机播放的情况)。

所以你的代码是正确的,但如果你加入一次,它不会给你买任何东西。

只有一个评论,如果rddApartitionedrddBpartitioned有多个动作,最好在.partitionBy后面追加.persist(),否则,所有动作都会评估整个rddApartitionedrddBpartitioned的血统,会导致hash-partitioning一再发生。

val rddApartitioned = rddA.partitionBy(new HashPartitioner(numOfPartitions)).persist()
val rddBpartitioned = rddB.partitionBy(new HashPartitioner(numOfPartitions)).persist()