Spark 分区器的使用

Use of partitioners in Spark

嗨,我有一个关于 Spark 分区的问题,在 Learning Spark 一书中,作者说分区很有用,例如在 PageRank 在第 66 页,他们写道:

since links is a static dataset, we partition it at the start with partitionBy(), so that it does not need to be shuffled across the network

现在我关注的是这个例子,但我的问题很笼统:

  1. 为什么分区的RDD不需要洗牌?
  2. PartitionBy() 是一个广泛的转换,所以它无论如何都会产生随机播放,对吗?
  3. 有人可以举例说明当 partitionBy 发生时每个节点发生了什么吗?

提前致谢

Why a partitioned RDD doesn't need to be shuffled?

当作者这样做时:

val links = sc.objectFile[(String, Seq[String])]("links")
 .partitionBy(new HashPartitioner(100))
 .persist()

他将数据集分成 100 个分区,其中每个键都将散列到给定的分区(给定示例中的 pageId)。这意味着相同的密钥将存储在 单个给定分区 中。然后,当他执行 join:

val contributions = links.join(ranks)

具有相同 pageId 的所有数据块应该已经位于同一个执行器上,避免在集群中的不同节点之间进行洗牌。

PartitionBy() is a wide transformation,so it will produce shuffle anyway, right?

是的,partitionBy 产生一个 ShuffleRDD[K, V, V]:

def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
  if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
    throw new SparkException("HashPartitioner cannot partition array keys.")
  }
  if (self.partitioner == Some(partitioner)) {
    self
  } else {
    new ShuffledRDD[K, V, V](self, partitioner)
  }
}

Could someone illustrate a concrete example and what happen into each single node when partitionBy happens?

基本上,partitionBy 将执行以下操作:

它将以分区数(在本例中为 100)为模对密钥进行哈希处理,并且由于它依赖于相同的密钥始终会产生相同的哈希码这一事实,因此它将打包来自给定 ID 的所有数据(在我们的例子中,pageId) 到同一个分区,这样当你 join 时,所有数据都将在该分区中可用,避免了随机播放的需要。