在 dataframe 中使用 dropDuplicates 会导致分区号发生变化
Using dropDuplicates in dataframe causes changes in the partition number
我创建了一个包含 800 个分区的大型数据框。
df.rdd.getNumPartitions()
800
当我在数据帧上使用 dropDuplicates 时,它将分区更改为默认的 200
df = df.dropDuplicates()
df.rdd.getNumPartitions()
200
这种行为给我带来了问题,因为它会导致内存不足。
你对解决这个问题有什么建议吗?我尝试将 spark.sql.shuffle.partition 设置为 800,但它不起作用。谢谢
我在
找到了解决方案
使用 reduceByKey 而不是 dropDuplicates。 reduceByKey 还可以选择为最终的 rdd 指定分区数。
在这种情况下使用 reduceByKey 的缺点是它很慢。
发生这种情况是因为 dropDuplicates
需要随机播放。如果你想获得特定数量的分区,你应该设置 spark.sql.shuffle.partitions
(它的默认值为 200)
df = sc.parallelize([("a", 1)]).toDF()
df.rdd.getNumPartitions()
## 8
df.dropDuplicates().rdd.getNumPartitions()
## 200
sqlContext.setConf("spark.sql.shuffle.partitions", "800")
df.dropDuplicates().rdd.getNumPartitions()
## 800
另一种方法 (Spark 1.6+) 是先重新分区:
df.repartition(801, *df.columns).dropDuplicates().rdd.getNumPartitions()
## 801
它稍微更灵活但效率较低,因为它不执行本地聚合。
我创建了一个包含 800 个分区的大型数据框。
df.rdd.getNumPartitions()
800
当我在数据帧上使用 dropDuplicates 时,它将分区更改为默认的 200
df = df.dropDuplicates()
df.rdd.getNumPartitions()
200
这种行为给我带来了问题,因为它会导致内存不足。
你对解决这个问题有什么建议吗?我尝试将 spark.sql.shuffle.partition 设置为 800,但它不起作用。谢谢
我在
使用 reduceByKey 而不是 dropDuplicates。 reduceByKey 还可以选择为最终的 rdd 指定分区数。
在这种情况下使用 reduceByKey 的缺点是它很慢。
发生这种情况是因为 dropDuplicates
需要随机播放。如果你想获得特定数量的分区,你应该设置 spark.sql.shuffle.partitions
(它的默认值为 200)
df = sc.parallelize([("a", 1)]).toDF()
df.rdd.getNumPartitions()
## 8
df.dropDuplicates().rdd.getNumPartitions()
## 200
sqlContext.setConf("spark.sql.shuffle.partitions", "800")
df.dropDuplicates().rdd.getNumPartitions()
## 800
另一种方法 (Spark 1.6+) 是先重新分区:
df.repartition(801, *df.columns).dropDuplicates().rdd.getNumPartitions()
## 801
它稍微更灵活但效率较低,因为它不执行本地聚合。