如何在加入 Spark 之前正确应用 HashPartitioner?
How to properly apply HashPartitioner before a join in Spark?
为了减少两个RDD连接时的shuffling,我决定先使用HashPartitioner对它们进行分区。我是这样做的。我这样做是否正确,或者有更好的方法吗?
val rddA = ...
val rddB = ...
val numOfPartitions = rddA.getNumPartitions
val rddApartitioned = rddA.partitionBy(new HashPartitioner(numOfPartitions))
val rddBpartitioned = rddB.partitionBy(new HashPartitioner(numOfPartitions))
val rddAB = rddApartitioned.join(rddBpartitioned)
To reduce shuffling during the joining of two RDDs,
令人惊讶的是,重新分区会减少甚至消除洗牌是一种常见的误解。 没有。重新分区 是 shuffle 最纯粹的形式。它不会节省时间、带宽或内存。
使用主动分区程序背后的基本原理是不同的 - 它允许您洗牌一次,并重用状态,执行多个按键操作,而无需额外的洗牌(尽管作为据我所知,不一定没有额外的网络流量,,不包括在单个操作中发生随机播放的情况)。
所以你的代码是正确的,但如果你加入一次,它不会给你买任何东西。
只有一个评论,如果rddApartitioned
和rddBpartitioned
有多个动作,最好在.partitionBy
后面追加.persist()
,否则,所有动作都会评估整个rddApartitioned
和rddBpartitioned
的血统,会导致hash-partitioning一再发生。
val rddApartitioned = rddA.partitionBy(new HashPartitioner(numOfPartitions)).persist()
val rddBpartitioned = rddB.partitionBy(new HashPartitioner(numOfPartitions)).persist()
为了减少两个RDD连接时的shuffling,我决定先使用HashPartitioner对它们进行分区。我是这样做的。我这样做是否正确,或者有更好的方法吗?
val rddA = ...
val rddB = ...
val numOfPartitions = rddA.getNumPartitions
val rddApartitioned = rddA.partitionBy(new HashPartitioner(numOfPartitions))
val rddBpartitioned = rddB.partitionBy(new HashPartitioner(numOfPartitions))
val rddAB = rddApartitioned.join(rddBpartitioned)
To reduce shuffling during the joining of two RDDs,
令人惊讶的是,重新分区会减少甚至消除洗牌是一种常见的误解。 没有。重新分区 是 shuffle 最纯粹的形式。它不会节省时间、带宽或内存。
使用主动分区程序背后的基本原理是不同的 - 它允许您洗牌一次,并重用状态,执行多个按键操作,而无需额外的洗牌(尽管作为据我所知,不一定没有额外的网络流量,
所以你的代码是正确的,但如果你加入一次,它不会给你买任何东西。
只有一个评论,如果rddApartitioned
和rddBpartitioned
有多个动作,最好在.partitionBy
后面追加.persist()
,否则,所有动作都会评估整个rddApartitioned
和rddBpartitioned
的血统,会导致hash-partitioning一再发生。
val rddApartitioned = rddA.partitionBy(new HashPartitioner(numOfPartitions)).persist()
val rddBpartitioned = rddB.partitionBy(new HashPartitioner(numOfPartitions)).persist()