Apache Spark:重新分区、排序和缓存对连接的影响
Apache Spark: impact of repartitioning, sorting and caching on a join
我正在探索 Spark 在将 table 加入自身时的行为。我正在使用 Databricks。
我的虚拟场景是:
读取外部 table 作为数据帧 A(底层文件为增量格式)
将数据框 B 定义为仅选择了某些列的数据框 A
在 column1 和 column2 上连接数据帧 A 和 B
(是的,没有多大意义,我只是在尝试了解 Spark 的底层机制)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
b = a.select("column1", "column2", "columnA")
c= a.join(b, how="left", on = ["column1", "column2"])
我的第一次尝试是 运行 代码原样(尝试 1)。然后我尝试重新分区和缓存(尝试 2)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()
最后,我重新分区、排序和缓存
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()
生成的各个dag如附件所示
我的问题是:
为什么在尝试 1 中 table 似乎已被缓存,即使未明确指定缓存。
为什么 InMemoreTableScan 后面总是跟着另一个这种类型的节点。
为什么 attempt 3 缓存出现在两个阶段?
为什么尝试 3 WholeStageCodegen 跟随一个(而且只有一个)InMemoreTableScan。
您在这 3 个计划中观察到的是 DataBricks 运行时和 Spark 的混合体。
首先,当 运行 Databricks runtime 3.3+ 时,会自动为所有 parquet 文件启用缓存。
相应的配置:
spark.databricks.io.cache.enabled true
对于您的第二个查询,InMemoryTableScan 发生了两次,因为就在调用 join 时,spark 尝试并行计算数据集 A 和数据集 B。假设不同的执行者被分配了上述任务,两者都必须从 (Databricks) 缓存中扫描 table。
对于第三个,InMemoryTableScan本身并不是指缓存。这只是意味着无论形成何种计划催化剂,都涉及多次扫描缓存 table。
PS: 我无法想象第 4 点 :)
我正在探索 Spark 在将 table 加入自身时的行为。我正在使用 Databricks。
我的虚拟场景是:
读取外部 table 作为数据帧 A(底层文件为增量格式)
将数据框 B 定义为仅选择了某些列的数据框 A
在 column1 和 column2 上连接数据帧 A 和 B
(是的,没有多大意义,我只是在尝试了解 Spark 的底层机制)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
b = a.select("column1", "column2", "columnA")
c= a.join(b, how="left", on = ["column1", "column2"])
我的第一次尝试是 运行 代码原样(尝试 1)。然后我尝试重新分区和缓存(尝试 2)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()
最后,我重新分区、排序和缓存
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()
生成的各个dag如附件所示
我的问题是:
为什么在尝试 1 中 table 似乎已被缓存,即使未明确指定缓存。
为什么 InMemoreTableScan 后面总是跟着另一个这种类型的节点。
为什么 attempt 3 缓存出现在两个阶段?
为什么尝试 3 WholeStageCodegen 跟随一个(而且只有一个)InMemoreTableScan。
您在这 3 个计划中观察到的是 DataBricks 运行时和 Spark 的混合体。
首先,当 运行 Databricks runtime 3.3+ 时,会自动为所有 parquet 文件启用缓存。
相应的配置:
spark.databricks.io.cache.enabled true
对于您的第二个查询,InMemoryTableScan 发生了两次,因为就在调用 join 时,spark 尝试并行计算数据集 A 和数据集 B。假设不同的执行者被分配了上述任务,两者都必须从 (Databricks) 缓存中扫描 table。
对于第三个,InMemoryTableScan本身并不是指缓存。这只是意味着无论形成何种计划催化剂,都涉及多次扫描缓存 table。
PS: 我无法想象第 4 点 :)