Spark 中的默认分区方案

Default Partitioning Scheme in Spark

当我执行以下命令时:

scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)),4).partitionBy(new HashPartitioner(10)).persist()
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[10] at partitionBy at <console>:22

scala> rdd.partitions.size
res9: Int = 10

scala> rdd.partitioner.isDefined
res10: Boolean = true


scala> rdd.partitioner.get
res11: org.apache.spark.Partitioner = org.apache.spark.HashPartitioner@a

说有10个分区,分区是用HashPartitioner完成的。但是当我执行以下命令时:

scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)),4)
...
scala> rdd.partitions.size
res6: Int = 4
scala> rdd.partitioner.isDefined
res8: Boolean = false

说有4个分区,partitioner没有定义。那么,Spark 中的默认分区方案是什么? / 在第二种情况下数据如何分区?

你必须区分两种不同的东西:

  • 分区是根据仅限于 PairwiseRDDs (RDD[(T, U)]) 的键值在分区之间分配数据。这在分区和可以在给定分区上找到的键集之间创建了关系。
  • 分区将输入分成多个分区,其中数据被简单地分成包含连续记录的块,以实现分布式计算。确切的逻辑取决于特定的来源,但它要么是记录的数量,要么是块的大小。

    parallelize 的情况下,数据使用索引在分区之间均匀分布。在 HadoopInputFormats(如 textFile)的情况下,它取决于 mapreduce.input.fileinputformat.split.minsize / mapreduce.input.fileinputformat.split.maxsize.

  • 等属性

所以默认的分区方案就是none,因为分区并不适用于所有的RDD。对于需要在 PairwiseRDDaggregateByKeyreduceByKey 等)上进行分区的操作,默认方法是使用哈希分区。