"Exchange hashpartitioning" 如何在 spark 中工作
how is "Exchange hashpartitioning" working in spark
我有一个数据集,我想将其写入 parquet 文件中,以便之后通过 Spark 请求这些文件,包括谓词下推。
目前我使用按列重新分区和分区数将数据移动到特定分区。该列标识相应的分区(从 0 开始到(固定的)n)。结果是 scala/spark 生成了意外结果并创建了更少的分区(其中一些是空的)。也许是哈希冲突?
为了解决这个问题,我试图找出原因并试图找到解决方法。我通过将数据帧转换为 rdd 并将 partitionBy 与 HashPartitioner 一起使用找到了一种解决方法。令我惊讶的是:我得到了预期的结果。但是将dataframe转换为RDD对我来说不是一个解决方案,因为它占用了太多资源。
我已经在
上测试了这个环境
cloudera CDH 5.9.3 上的 SPARK 2.0
emr-5.17.0 上的 SPARK 2.3.1
这是我的输出测试。请使用Spark-shell来运行他们
scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner
scala> val mydataindex = Array(0,1, 2, 3,4)
mydataindex: Array[Int] = Array(0, 1, 2, 3, 4)
scala> val mydata = sc.parallelize(for {
| x <- mydataindex
| y <- Array(123,456,789)
| } yield (x, y), 100)
mydata: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> val rddMyData = mydata.partitionBy(new HashPartitioner(5))
rddMyData: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:26
scala> val rddMyDataPartitions = rddMyData.mapPartitionsWithIndex{
| (index, iterator) => {
| val myList = iterator.toList
| myList.map(x => x + " -> " + index).iterator
| }
| }
rddMyDataPartitions: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at mapPartitionsWithIndex at <console>:26
scala>
| // this is expected:
scala> rddMyDataPartitions.take(100)
res1: Array[String] = Array((0,123) -> 0, (0,456) -> 0, (0,789) -> 0, (1,123) -> 1, (1,456) -> 1, (1,789) -> 1, (2,123) -> 2, (2,456) -> 2, (2,789) -> 2, (3,456) -> 3, (3,789) -> 3, (3,123) -> 3, (4,789) -> 4, (4,123) -> 4, (4,456) -> 4)
scala> val dfMyData = mydata.toDF()
dfMyData: org.apache.spark.sql.DataFrame = [_1: int, _2: int]
scala> val dfMyDataRepartitioned = dfMyData.repartition(5,col("_1"))
dfMyDataRepartitioned: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_1: int, _2: int]
scala> dfMyDataRepartitioned.explain(false)
== Physical Plan ==
Exchange hashpartitioning(_1#3, 5)
+- *(1) SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#3, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#4]
+- Scan ExternalRDDScan[obj#2]
scala> val dfMyDataRepartitionedPartition = dfMyDataRepartitioned.withColumn("partition_id", spark_partition_id()).groupBy("partition_id").count()
dfMyDataRepartitionedPartition: org.apache.spark.sql.DataFrame = [partition_id: int, count: bigint]
scala> // this is unexpected, because 1 partition has more indexes
scala> dfMyDataRepartitionedPartition.show()
+------------+-----+
|partition_id|count|
+------------+-----+
| 1| 6|
| 3| 3|
| 4| 3|
| 2| 3|
+------------+-----+
我首先认为 HashPartitioner 被用于重新分区数据帧的方法,但事实并非如此,因为它正在处理 RDD。
谁能指导我这个 "Exchange hashpartitioning"(请参阅上面的解释输出)是如何工作的?
2019-01-16 12:20:这不是的副本,因为我对整数列上按列(+数字分区)重新分区的哈希算法感兴趣。正如您在源代码中看到的那样,通用 HashPartitioner 正在按预期工作。
这里没有什么意外。如 中所述,Spark 使用散列(键)模数分区和非均匀分布,尤其是在小型数据集上并非意外。
Dataset
和 RDD
之间的差异也在意料之中,因为两者使用不同的哈希函数(同上)。
终于
The result is that scala/spark is generating an unexpected result and creating less partitions
不是正确的观察。创建的分区数正是要求的
scala> dfMyDataRepartitioned.rdd.getNumPartitions
res8: Int = 5
但是空值在聚合中是不可见的,因为没有相应的值。
我有一个数据集,我想将其写入 parquet 文件中,以便之后通过 Spark 请求这些文件,包括谓词下推。
目前我使用按列重新分区和分区数将数据移动到特定分区。该列标识相应的分区(从 0 开始到(固定的)n)。结果是 scala/spark 生成了意外结果并创建了更少的分区(其中一些是空的)。也许是哈希冲突?
为了解决这个问题,我试图找出原因并试图找到解决方法。我通过将数据帧转换为 rdd 并将 partitionBy 与 HashPartitioner 一起使用找到了一种解决方法。令我惊讶的是:我得到了预期的结果。但是将dataframe转换为RDD对我来说不是一个解决方案,因为它占用了太多资源。
我已经在
上测试了这个环境cloudera CDH 5.9.3 上的 SPARK 2.0
emr-5.17.0 上的 SPARK 2.3.1
这是我的输出测试。请使用Spark-shell来运行他们
scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner
scala> val mydataindex = Array(0,1, 2, 3,4)
mydataindex: Array[Int] = Array(0, 1, 2, 3, 4)
scala> val mydata = sc.parallelize(for {
| x <- mydataindex
| y <- Array(123,456,789)
| } yield (x, y), 100)
mydata: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> val rddMyData = mydata.partitionBy(new HashPartitioner(5))
rddMyData: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:26
scala> val rddMyDataPartitions = rddMyData.mapPartitionsWithIndex{
| (index, iterator) => {
| val myList = iterator.toList
| myList.map(x => x + " -> " + index).iterator
| }
| }
rddMyDataPartitions: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at mapPartitionsWithIndex at <console>:26
scala>
| // this is expected:
scala> rddMyDataPartitions.take(100)
res1: Array[String] = Array((0,123) -> 0, (0,456) -> 0, (0,789) -> 0, (1,123) -> 1, (1,456) -> 1, (1,789) -> 1, (2,123) -> 2, (2,456) -> 2, (2,789) -> 2, (3,456) -> 3, (3,789) -> 3, (3,123) -> 3, (4,789) -> 4, (4,123) -> 4, (4,456) -> 4)
scala> val dfMyData = mydata.toDF()
dfMyData: org.apache.spark.sql.DataFrame = [_1: int, _2: int]
scala> val dfMyDataRepartitioned = dfMyData.repartition(5,col("_1"))
dfMyDataRepartitioned: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_1: int, _2: int]
scala> dfMyDataRepartitioned.explain(false)
== Physical Plan ==
Exchange hashpartitioning(_1#3, 5)
+- *(1) SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#3, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#4]
+- Scan ExternalRDDScan[obj#2]
scala> val dfMyDataRepartitionedPartition = dfMyDataRepartitioned.withColumn("partition_id", spark_partition_id()).groupBy("partition_id").count()
dfMyDataRepartitionedPartition: org.apache.spark.sql.DataFrame = [partition_id: int, count: bigint]
scala> // this is unexpected, because 1 partition has more indexes
scala> dfMyDataRepartitionedPartition.show()
+------------+-----+
|partition_id|count|
+------------+-----+
| 1| 6|
| 3| 3|
| 4| 3|
| 2| 3|
+------------+-----+
我首先认为 HashPartitioner 被用于重新分区数据帧的方法,但事实并非如此,因为它正在处理 RDD。
谁能指导我这个 "Exchange hashpartitioning"(请参阅上面的解释输出)是如何工作的?
2019-01-16 12:20:这不是
这里没有什么意外。如
Dataset
和 RDD
之间的差异也在意料之中,因为两者使用不同的哈希函数(同上)。
终于
The result is that scala/spark is generating an unexpected result and creating less partitions
不是正确的观察。创建的分区数正是要求的
scala> dfMyDataRepartitioned.rdd.getNumPartitions
res8: Int = 5
但是空值在聚合中是不可见的,因为没有相应的值。