如何跨分区平衡我的数据?
How to balance my data across the partitions?
编辑:答案有帮助,但我在以下位置描述了我的解决方案:memoryOverhead issue in Spark。
我有一个202092个分区的RDD,它读取一个别人创建的数据集。我可以手动看到分区之间的数据不平衡,例如其中一些有 0 个图像,其他有 4k,而平均值为 432。处理数据时,我收到此错误:
Container killed by YARN for exceeding memory limits. 16.9 GB of 16 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
而 memoryOverhead 已经提升。我觉得一些尖峰正在发生,这让 Yarn 杀死了我的容器,因为那个尖峰溢出了指定的边界。
那么我应该怎么做才能确保我的数据(大致)平衡跨分区?
我的想法是 repartition() 会起作用,它会调用洗牌:
dataset = dataset.repartition(202092)
但是尽管有 programming-guide 的说明,我还是得到了同样的错误:
repartition(numPartitions)
Reshuffle the data in the RDD randomly to create either more or fewer
partitions and balance it across them. This always shuffles all data
over the network.
尽管检查我的玩具示例:
data = sc.parallelize([0,1,2], 3).mapPartitions(lambda x: range((x.next() + 1) * 1000))
d = data.glom().collect()
len(d[0]) # 1000
len(d[1]) # 2000
len(d[2]) # 3000
repartitioned_data = data.repartition(3)
re_d = repartitioned_data.glom().collect()
len(re_d[0]) # 1854
len(re_d[1]) # 1754
len(re_d[2]) # 2392
repartitioned_data = data.repartition(6)
re_d = repartitioned_data.glom().collect()
len(re_d[0]) # 422
len(re_d[1]) # 845
len(re_d[2]) # 1643
len(re_d[3]) # 1332
len(re_d[4]) # 1547
len(re_d[5]) # 211
repartitioned_data = data.repartition(12)
re_d = repartitioned_data.glom().collect()
len(re_d[0]) # 132
len(re_d[1]) # 265
len(re_d[2]) # 530
len(re_d[3]) # 1060
len(re_d[4]) # 1025
len(re_d[5]) # 145
len(re_d[6]) # 290
len(re_d[7]) # 580
len(re_d[8]) # 1113
len(re_d[9]) # 272
len(re_d[10]) # 522
len(re_d[11]) # 66
我认为内存开销超出限制的问题是由于在获取期间使用了 DirectMemory 缓冲区。我认为它在 2.0.0 中已修复。 (我们遇到了同样的问题,但是当我们发现升级到 2.0.0 解决了它时,我们停止了更深入的挖掘。不幸的是,我没有 Spark 问题编号来支持我。)
repartition
之后的分区不均匀令人惊讶。对比https://github.com/apache/spark/blob/v2.0.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L443。 Spark 甚至在 repartition
中生成随机密钥,因此它不会使用可能有偏差的散列来完成。
我尝试了您的示例,并获得了与 Spark 1.6.2 和 Spark 2.0.0 完全 相同的结果。但不是来自 Scala spark-shell
:
scala> val data = sc.parallelize(1 to 3, 3).mapPartitions { it => (1 to it.next * 1000).iterator }
data: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at mapPartitions at <console>:24
scala> data.mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
res1: Seq[Int] = WrappedArray(1000, 2000, 3000)
scala> data.repartition(3).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
res2: Seq[Int] = WrappedArray(1999, 2001, 2000)
scala> data.repartition(6).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
res3: Seq[Int] = WrappedArray(999, 1000, 1000, 1000, 1001, 1000)
scala> data.repartition(12).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
res4: Seq[Int] = WrappedArray(500, 501, 501, 501, 501, 500, 499, 499, 499, 499, 500, 500)
好漂亮的分区!
(抱歉,这不是一个完整的答案。我只是想分享我到目前为止的发现。)
编辑:答案有帮助,但我在以下位置描述了我的解决方案:memoryOverhead issue in Spark。
我有一个202092个分区的RDD,它读取一个别人创建的数据集。我可以手动看到分区之间的数据不平衡,例如其中一些有 0 个图像,其他有 4k,而平均值为 432。处理数据时,我收到此错误:
Container killed by YARN for exceeding memory limits. 16.9 GB of 16 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
而 memoryOverhead 已经提升。我觉得一些尖峰正在发生,这让 Yarn 杀死了我的容器,因为那个尖峰溢出了指定的边界。
那么我应该怎么做才能确保我的数据(大致)平衡跨分区?
我的想法是 repartition() 会起作用,它会调用洗牌:
dataset = dataset.repartition(202092)
但是尽管有 programming-guide 的说明,我还是得到了同样的错误:
repartition(numPartitions)
Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
尽管检查我的玩具示例:
data = sc.parallelize([0,1,2], 3).mapPartitions(lambda x: range((x.next() + 1) * 1000))
d = data.glom().collect()
len(d[0]) # 1000
len(d[1]) # 2000
len(d[2]) # 3000
repartitioned_data = data.repartition(3)
re_d = repartitioned_data.glom().collect()
len(re_d[0]) # 1854
len(re_d[1]) # 1754
len(re_d[2]) # 2392
repartitioned_data = data.repartition(6)
re_d = repartitioned_data.glom().collect()
len(re_d[0]) # 422
len(re_d[1]) # 845
len(re_d[2]) # 1643
len(re_d[3]) # 1332
len(re_d[4]) # 1547
len(re_d[5]) # 211
repartitioned_data = data.repartition(12)
re_d = repartitioned_data.glom().collect()
len(re_d[0]) # 132
len(re_d[1]) # 265
len(re_d[2]) # 530
len(re_d[3]) # 1060
len(re_d[4]) # 1025
len(re_d[5]) # 145
len(re_d[6]) # 290
len(re_d[7]) # 580
len(re_d[8]) # 1113
len(re_d[9]) # 272
len(re_d[10]) # 522
len(re_d[11]) # 66
我认为内存开销超出限制的问题是由于在获取期间使用了 DirectMemory 缓冲区。我认为它在 2.0.0 中已修复。 (我们遇到了同样的问题,但是当我们发现升级到 2.0.0 解决了它时,我们停止了更深入的挖掘。不幸的是,我没有 Spark 问题编号来支持我。)
repartition
之后的分区不均匀令人惊讶。对比https://github.com/apache/spark/blob/v2.0.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L443。 Spark 甚至在 repartition
中生成随机密钥,因此它不会使用可能有偏差的散列来完成。
我尝试了您的示例,并获得了与 Spark 1.6.2 和 Spark 2.0.0 完全 相同的结果。但不是来自 Scala spark-shell
:
scala> val data = sc.parallelize(1 to 3, 3).mapPartitions { it => (1 to it.next * 1000).iterator }
data: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at mapPartitions at <console>:24
scala> data.mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
res1: Seq[Int] = WrappedArray(1000, 2000, 3000)
scala> data.repartition(3).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
res2: Seq[Int] = WrappedArray(1999, 2001, 2000)
scala> data.repartition(6).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
res3: Seq[Int] = WrappedArray(999, 1000, 1000, 1000, 1001, 1000)
scala> data.repartition(12).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
res4: Seq[Int] = WrappedArray(500, 501, 501, 501, 501, 500, 499, 499, 499, 499, 500, 500)
好漂亮的分区!
(抱歉,这不是一个完整的答案。我只是想分享我到目前为止的发现。)