为什么自适应 SQL 不能与 df persist 一起使用?
Why is adaptive SQL not working with df persist?
val spark = SparkSession.builder().master("local[4]").appName("Test")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "50m")
.config("spark.sql.adaptive.coalescePartitions.minPartitionNum", "1")
.config("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "1024")
.getOrCreate()
val df = spark.read.csv("<Input File Path>")
val df1 = df.distinct()
df1.persist() // On removing this line. Code works as expected
df1.write.csv("<Output File Path>")
我有一个大小为 2 GB 的输入文件,它被读取为 16 个分区,每个分区大小为 128 MB。我已启用自适应 SQL 以在 shuffle
后合并分区
没有 df1.persist,df1.write.csv 写入 4 个分区文件,每个分区文件 50 MB,这是预期的
Without persist
如果我包含 df1.persist,Spark 正在写入 200 个分区(自适应合并不起作用)
With persist
.config("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning", "true")
添加此配置有效
https://issues.apache.org/jira/projects/SPARK/issues/SPARK-38172?filter=reportedbyme
val spark = SparkSession.builder().master("local[4]").appName("Test")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "50m")
.config("spark.sql.adaptive.coalescePartitions.minPartitionNum", "1")
.config("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "1024")
.getOrCreate()
val df = spark.read.csv("<Input File Path>")
val df1 = df.distinct()
df1.persist() // On removing this line. Code works as expected
df1.write.csv("<Output File Path>")
我有一个大小为 2 GB 的输入文件,它被读取为 16 个分区,每个分区大小为 128 MB。我已启用自适应 SQL 以在 shuffle
后合并分区没有 df1.persist,df1.write.csv 写入 4 个分区文件,每个分区文件 50 MB,这是预期的 Without persist
如果我包含 df1.persist,Spark 正在写入 200 个分区(自适应合并不起作用) With persist
.config("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning", "true")
添加此配置有效 https://issues.apache.org/jira/projects/SPARK/issues/SPARK-38172?filter=reportedbyme