Spark 分区器的使用
Use of partitioners in Spark
嗨,我有一个关于 Spark 分区的问题,在 Learning Spark 一书中,作者说分区很有用,例如在 PageRank 在第 66 页,他们写道:
since links is a static dataset, we partition it at the start with
partitionBy(), so that it does not need to be shuffled across the
network
现在我关注的是这个例子,但我的问题很笼统:
- 为什么分区的RDD不需要洗牌?
- PartitionBy() 是一个广泛的转换,所以它无论如何都会产生随机播放,对吗?
- 有人可以举例说明当 partitionBy 发生时每个节点发生了什么吗?
提前致谢
Why a partitioned RDD doesn't need to be shuffled?
当作者这样做时:
val links = sc.objectFile[(String, Seq[String])]("links")
.partitionBy(new HashPartitioner(100))
.persist()
他将数据集分成 100 个分区,其中每个键都将散列到给定的分区(给定示例中的 pageId
)。这意味着相同的密钥将存储在 单个给定分区 中。然后,当他执行 join
:
val contributions = links.join(ranks)
具有相同 pageId
的所有数据块应该已经位于同一个执行器上,避免在集群中的不同节点之间进行洗牌。
PartitionBy() is a wide transformation,so it will produce shuffle
anyway, right?
是的,partitionBy
产生一个 ShuffleRDD[K, V, V]
:
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
if (self.partitioner == Some(partitioner)) {
self
} else {
new ShuffledRDD[K, V, V](self, partitioner)
}
}
Could someone illustrate a concrete example and what happen into each
single node when partitionBy happens?
基本上,partitionBy
将执行以下操作:
它将以分区数(在本例中为 100)为模对密钥进行哈希处理,并且由于它依赖于相同的密钥始终会产生相同的哈希码这一事实,因此它将打包来自给定 ID 的所有数据(在我们的例子中,pageId
) 到同一个分区,这样当你 join
时,所有数据都将在该分区中可用,避免了随机播放的需要。
嗨,我有一个关于 Spark 分区的问题,在 Learning Spark 一书中,作者说分区很有用,例如在 PageRank 在第 66 页,他们写道:
since links is a static dataset, we partition it at the start with partitionBy(), so that it does not need to be shuffled across the network
现在我关注的是这个例子,但我的问题很笼统:
- 为什么分区的RDD不需要洗牌?
- PartitionBy() 是一个广泛的转换,所以它无论如何都会产生随机播放,对吗?
- 有人可以举例说明当 partitionBy 发生时每个节点发生了什么吗?
提前致谢
Why a partitioned RDD doesn't need to be shuffled?
当作者这样做时:
val links = sc.objectFile[(String, Seq[String])]("links")
.partitionBy(new HashPartitioner(100))
.persist()
他将数据集分成 100 个分区,其中每个键都将散列到给定的分区(给定示例中的 pageId
)。这意味着相同的密钥将存储在 单个给定分区 中。然后,当他执行 join
:
val contributions = links.join(ranks)
具有相同 pageId
的所有数据块应该已经位于同一个执行器上,避免在集群中的不同节点之间进行洗牌。
PartitionBy() is a wide transformation,so it will produce shuffle anyway, right?
是的,partitionBy
产生一个 ShuffleRDD[K, V, V]
:
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
if (self.partitioner == Some(partitioner)) {
self
} else {
new ShuffledRDD[K, V, V](self, partitioner)
}
}
Could someone illustrate a concrete example and what happen into each single node when partitionBy happens?
基本上,partitionBy
将执行以下操作:
它将以分区数(在本例中为 100)为模对密钥进行哈希处理,并且由于它依赖于相同的密钥始终会产生相同的哈希码这一事实,因此它将打包来自给定 ID 的所有数据(在我们的例子中,pageId
) 到同一个分区,这样当你 join
时,所有数据都将在该分区中可用,避免了随机播放的需要。