了解火花物理计划
Understanding spark physical plan
我试图了解 spark 上的物理计划,但我不了解某些部分,因为它们看起来与传统的 rdbms 不同。例如,在下面的这个计划中,它是一个关于在配置单元 table 上查询的计划。查询是这样的:
select
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
lineitem
where
l_shipdate <= '1998-09-16'
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus;
== Physical Plan ==
Sort [l_returnflag#35 ASC,l_linestatus#36 ASC], true, 0
+- ConvertToUnsafe
+- Exchange rangepartitioning(l_returnflag#35 ASC,l_linestatus#36 ASC,200), None
+- ConvertToSafe
+- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Final,isDistinct=false),(sum(l_extendedpr#32),mode=Final,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Final,isDistinct=false),(sum(((l_extendedprice#32 * (1.0l_discount#33)) * (1.0 + l_tax#34))),mode=Final,isDistinct=false),(avg(l_quantity#31),mode=Final,isDistinct=false),(avg(l_extendedprice#32),mode=Fl,isDistinct=false),(avg(l_discount#33),mode=Final,isDistinct=false),(count(1),mode=Final,isDistinct=false)], output=[l_returnflag#35,l_linestatus,sum_qty#0,sum_base_price#1,sum_disc_price#2,sum_charge#3,avg_qty#4,avg_price#5,avg_disc#6,count_order#7L])
+- TungstenExchange hashpartitioning(l_returnflag#35,l_linestatus#36,200), None
+- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Partial,isDistinct=false),(sum(l_exdedprice#32),mode=Partial,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Partial,isDistinct=false),(sum(((l_extendedpri32 * (1.0 - l_discount#33)) * (1.0 + l_tax#34))),mode=Partial,isDistinct=false),(avg(l_quantity#31),mode=Partial,isDistinct=false),(avg(l_extendedce#32),mode=Partial,isDistinct=false),(avg(l_discount#33),mode=Partial,isDistinct=false),(count(1),mode=Partial,isDistinct=false)], output=[l_retulag#35,l_linestatus#36,sum#64,sum#65,sum#66,sum#67,sum#68,count#69L,sum#70,count#71L,sum#72,count#73L,count#74L])
+- Project [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_returnflag#35]
+- Filter (l_shipdate#37 <= 1998-09-16)
+- HiveTableScan [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_shipdate#37,l_returnflag#35], astoreRelation default, lineitem, None
我在计划中的理解是:
首先从 Hive table 扫描开始
然后使用where条件过滤
然后project得到我们想要的列
然后是TungstenAggregate?
然后TungstenExchange?
然后又是TungstenAggregate?
然后ConvertToSafe?
然后对最终结果进行排序
但是我不理解第 4、5、6 和 7 个步骤。你知道它们是什么吗?我正在查找有关此的信息,以便了解该计划,但我没有找到任何具体内容。
Tungsten是Spark从1.4开始新的内存引擎,在JVM之外管理数据,节省一些GC开销。您可以想象这样做涉及从 JVM 复制数据和向 JVM 复制数据。而已。在 Spark 1.5 中,您可以通过 spark.sql.tungsten.enabled
关闭 Tungsten,然后您将看到 "old" 计划,在 Spark 1.6 中,我认为您不能再将其关闭。
让我们看看您使用的 SQL 查询的结构:
SELECT
... -- not aggregated columns #1
... -- aggregated columns #2
FROM
... -- #3
WHERE
... -- #4
GROUP BY
... -- #5
ORDER BY
... -- #6
如您所料:
Filter (...)
对应于 WHERE
子句中的谓词 (#4
)
Project ...
将列数限制为 (#1
和 #2
的并集所需的列数,如果不存在于 #4
/ #6
SELECT
)
HiveTableScan
对应于 FROM
子句 (#3
)
其余部分归纳如下:
来自 SELECT
子句的 #2
- TungstenAggregates
中的 functions
字段
GROUP BY
子句 (#5
):
TungstenExchange
/散列分区
key
字段 TungstenAggregates
#6
- ORDER BY
子句。
Tungsten 项目总体上描述了 Spark DataFrames
(-sets
) 使用的一组优化,包括:
- 使用
sun.misc.Unsafe
的显式内存管理。这意味着 "native"(堆外)内存使用和 GC 管理之外的显式内存分配/释放。这些转换对应于执行计划中的 ConvertToUnsafe
/ ConvertToSafe
个步骤。您可以从 Understanding sun.misc.Unsafe 了解一些关于不安全的有趣细节
- 代码生成 - 不同的元编程技巧旨在生成在编译期间优化得更好的代码。你可以把它想象成一个内部的 Spark 编译器,它可以做一些事情,比如将好的功能代码重写成丑陋的 for 循环。
您可以从 Project Tungsten: Bringing Apache Spark Closer to Bare Metal. Apache Spark 2.0: Faster, Easier, and Smarter 提供的一些代码生成示例中了解有关 Tungsten 的更多信息。
TungstenAggregate
出现两次,因为数据首先在每个分区上本地聚合,然后进行混洗,最后合并。如果你熟悉 RDD API 这个过程大致相当于 reduceByKey
.
如果执行计划不明确,您也可以尝试将结果 DataFrame
转换为 RDD
并分析 toDebugString
.
的输出
我试图了解 spark 上的物理计划,但我不了解某些部分,因为它们看起来与传统的 rdbms 不同。例如,在下面的这个计划中,它是一个关于在配置单元 table 上查询的计划。查询是这样的:
select
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
lineitem
where
l_shipdate <= '1998-09-16'
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus;
== Physical Plan ==
Sort [l_returnflag#35 ASC,l_linestatus#36 ASC], true, 0
+- ConvertToUnsafe
+- Exchange rangepartitioning(l_returnflag#35 ASC,l_linestatus#36 ASC,200), None
+- ConvertToSafe
+- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Final,isDistinct=false),(sum(l_extendedpr#32),mode=Final,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Final,isDistinct=false),(sum(((l_extendedprice#32 * (1.0l_discount#33)) * (1.0 + l_tax#34))),mode=Final,isDistinct=false),(avg(l_quantity#31),mode=Final,isDistinct=false),(avg(l_extendedprice#32),mode=Fl,isDistinct=false),(avg(l_discount#33),mode=Final,isDistinct=false),(count(1),mode=Final,isDistinct=false)], output=[l_returnflag#35,l_linestatus,sum_qty#0,sum_base_price#1,sum_disc_price#2,sum_charge#3,avg_qty#4,avg_price#5,avg_disc#6,count_order#7L])
+- TungstenExchange hashpartitioning(l_returnflag#35,l_linestatus#36,200), None
+- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Partial,isDistinct=false),(sum(l_exdedprice#32),mode=Partial,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Partial,isDistinct=false),(sum(((l_extendedpri32 * (1.0 - l_discount#33)) * (1.0 + l_tax#34))),mode=Partial,isDistinct=false),(avg(l_quantity#31),mode=Partial,isDistinct=false),(avg(l_extendedce#32),mode=Partial,isDistinct=false),(avg(l_discount#33),mode=Partial,isDistinct=false),(count(1),mode=Partial,isDistinct=false)], output=[l_retulag#35,l_linestatus#36,sum#64,sum#65,sum#66,sum#67,sum#68,count#69L,sum#70,count#71L,sum#72,count#73L,count#74L])
+- Project [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_returnflag#35]
+- Filter (l_shipdate#37 <= 1998-09-16)
+- HiveTableScan [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_shipdate#37,l_returnflag#35], astoreRelation default, lineitem, None
我在计划中的理解是:
首先从 Hive table 扫描开始
然后使用where条件过滤
然后project得到我们想要的列
然后是TungstenAggregate?
然后TungstenExchange?
然后又是TungstenAggregate?
然后ConvertToSafe?
然后对最终结果进行排序
但是我不理解第 4、5、6 和 7 个步骤。你知道它们是什么吗?我正在查找有关此的信息,以便了解该计划,但我没有找到任何具体内容。
Tungsten是Spark从1.4开始新的内存引擎,在JVM之外管理数据,节省一些GC开销。您可以想象这样做涉及从 JVM 复制数据和向 JVM 复制数据。而已。在 Spark 1.5 中,您可以通过 spark.sql.tungsten.enabled
关闭 Tungsten,然后您将看到 "old" 计划,在 Spark 1.6 中,我认为您不能再将其关闭。
让我们看看您使用的 SQL 查询的结构:
SELECT
... -- not aggregated columns #1
... -- aggregated columns #2
FROM
... -- #3
WHERE
... -- #4
GROUP BY
... -- #5
ORDER BY
... -- #6
如您所料:
Filter (...)
对应于WHERE
子句中的谓词 (#4
)Project ...
将列数限制为 (#1
和#2
的并集所需的列数,如果不存在于#4
/#6
SELECT
)HiveTableScan
对应于FROM
子句 (#3
)
其余部分归纳如下:
-
来自
#2
-TungstenAggregates
中的 GROUP BY
子句 (#5
):TungstenExchange
/散列分区key
字段TungstenAggregates
#6
-ORDER BY
子句。
SELECT
子句的 functions
字段
Tungsten 项目总体上描述了 Spark DataFrames
(-sets
) 使用的一组优化,包括:
- 使用
sun.misc.Unsafe
的显式内存管理。这意味着 "native"(堆外)内存使用和 GC 管理之外的显式内存分配/释放。这些转换对应于执行计划中的ConvertToUnsafe
/ConvertToSafe
个步骤。您可以从 Understanding sun.misc.Unsafe 了解一些关于不安全的有趣细节
- 代码生成 - 不同的元编程技巧旨在生成在编译期间优化得更好的代码。你可以把它想象成一个内部的 Spark 编译器,它可以做一些事情,比如将好的功能代码重写成丑陋的 for 循环。
您可以从 Project Tungsten: Bringing Apache Spark Closer to Bare Metal. Apache Spark 2.0: Faster, Easier, and Smarter 提供的一些代码生成示例中了解有关 Tungsten 的更多信息。
TungstenAggregate
出现两次,因为数据首先在每个分区上本地聚合,然后进行混洗,最后合并。如果你熟悉 RDD API 这个过程大致相当于 reduceByKey
.
如果执行计划不明确,您也可以尝试将结果 DataFrame
转换为 RDD
并分析 toDebugString
.