在 dataframe 中使用 dropDuplicates 会导致分区号发生变化

Using dropDuplicates in dataframe causes changes in the partition number

我创建了一个包含 800 个分区的大型数据框。

df.rdd.getNumPartitions()
800

当我在数据帧上使用 dropDuplicates 时,它将分区更改为默认的 200

df = df.dropDuplicates()
df.rdd.getNumPartitions()
200

这种行为给我带来了问题,因为它会导致内存不足。

你对解决这个问题有什么建议吗?我尝试将 spark.sql.shuffle.partition 设置为 800,但它不起作用。谢谢

我在

找到了解决方案

使用 reduceByKey 而不是 dropDuplicates。 reduceByKey 还可以选择为最终的 rdd 指定分区数。

在这种情况下使用 reduceByKey 的缺点是它很慢。

发生这种情况是因为 dropDuplicates 需要随机播放。如果你想获得特定数量的分区,你应该设置 spark.sql.shuffle.partitions (它的默认值为 200)

df = sc.parallelize([("a", 1)]).toDF()
df.rdd.getNumPartitions()
## 8

df.dropDuplicates().rdd.getNumPartitions()
## 200

sqlContext.setConf("spark.sql.shuffle.partitions", "800")

df.dropDuplicates().rdd.getNumPartitions()
## 800

另一种方法 (Spark 1.6+) 是先重新分区:

df.repartition(801, *df.columns).dropDuplicates().rdd.getNumPartitions()
## 801

它稍微更灵活但效率较低,因为它不执行本地聚合。