为什么自适应 SQL 不能与 df persist 一起使用?

Why is adaptive SQL not working with df persist?

val spark = SparkSession.builder().master("local[4]").appName("Test")
                        .config("spark.sql.adaptive.enabled", "true")
                        .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
                        .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "50m")
                        .config("spark.sql.adaptive.coalescePartitions.minPartitionNum", "1")
                        .config("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "1024")
                        .getOrCreate()

val df = spark.read.csv("<Input File Path>")
val df1 = df.distinct()
df1.persist() // On removing this line. Code works as expected
df1.write.csv("<Output File Path>")

我有一个大小为 2 GB 的输入文件,它被读取为 16 个分区,每个分区大小为 128 MB。我已启用自适应 SQL 以在 shuffle

后合并分区

没有 df1.persist,df1.write.csv 写入 4 个分区文件,每个分区文件 50 MB,这是预期的 Without persist

如果我包含 df1.persist,Spark 正在写入 200 个分区(自适应合并不起作用) With persist

.config("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning", "true")

添加此配置有效 https://issues.apache.org/jira/projects/SPARK/issues/SPARK-38172?filter=reportedbyme