Spark:显式缓存会干扰 Catalyst 优化器优化某些查询的能力吗?
Spark: Explicit caching can interfere with Catalyst optimizer's ability to optimize some queries?
我正在学习将数据积木用于 spark 认证考试,他们的模拟考试(请参阅> https://databricks-prod-cloudfront.cloud.databricks.com/public/793177bc53e528530b06c78a4fa0e086/0/6221173/100020/latest.html)要求我们接受以下陈述作为真实事实:
"Explicit caching can decrease application performance by interfering
with the Catalyst optimizer's ability to optimize some queries"
尽管我已经阅读了很多关于催化剂的资料并且对细节有很好的了解,但我还是答错了这个问题。所以我想巩固我对这个主题的了解,并找到解释这个断言背后的方式和原因的来源。
任何人都可以提供这方面的指导吗?具体来说,为什么会这样?以及我们如何确保当我们缓存我们的数据集时,我们实际上不会妨碍优化器并使事情变得更糟? /谢谢!
缓存如何以及为什么会降低性能?
让我们用一个简单的例子来证明:
// Some data
val df = spark.range(100)
df.join(df, Seq("id")).filter('id <20).explain(true)
在这里,催化剂计划将通过在加入之前对每个数据帧进行过滤来优化此加入,以减少将被洗牌的数据量。
== Optimized Logical Plan ==
Project [id#0L]
+- Join Inner, (id#0L = id#69L)
:- Filter (id#0L < 20)
: +- Range (0, 100, step=1, splits=Some(4))
+- Filter (id#69L < 20)
+- Range (0, 100, step=1, splits=Some(4))
如果我们在连接后缓存查询,查询将不会像我们在这里看到的那样优化:
df.join(df, Seq("id")).cache.filter('id <20).explain(true)
== Optimized Logical Plan ==
Filter (id#0L < 20)
+- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#74L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=4)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 100, step=1, splits=4)
过滤器在最后完成...
为什么会这样?因为 cache
将数据帧写入磁盘。因此,每个后续查询都将使用这个缓存/写入磁盘的 DataFrame,因此它只会优化缓存后的查询部分。我们可以用同样的例子来验证!
df.join(df, Seq("id")).cache.join(df, Seq("id")).filter('id <20).explain(true)
== Optimized Logical Plan ==
Project [id#0L]
+- Join Inner, (id#0L = id#92L)
:- Filter (id#0L < 20)
: +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
: +- *Project [id#0L]
: +- *BroadcastHashJoin [id#0L], [id#74L], Inner, BuildRight
: :- *Range (0, 100, step=1, splits=4)
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
: +- *Range (0, 100, step=1, splits=4)
+- Filter (id#92L < 20)
+- Range (0, 100, step=1, splits=Some(4))
过滤器在第二次连接之前完成,但在第一次连接之后完成,因为它已被缓存。
如何避免?
知道你在做什么!您可以简单地比较催化剂计划,看看 Spark 缺少哪些优化。
我正在学习将数据积木用于 spark 认证考试,他们的模拟考试(请参阅> https://databricks-prod-cloudfront.cloud.databricks.com/public/793177bc53e528530b06c78a4fa0e086/0/6221173/100020/latest.html)要求我们接受以下陈述作为真实事实:
"Explicit caching can decrease application performance by interfering with the Catalyst optimizer's ability to optimize some queries"
尽管我已经阅读了很多关于催化剂的资料并且对细节有很好的了解,但我还是答错了这个问题。所以我想巩固我对这个主题的了解,并找到解释这个断言背后的方式和原因的来源。
任何人都可以提供这方面的指导吗?具体来说,为什么会这样?以及我们如何确保当我们缓存我们的数据集时,我们实际上不会妨碍优化器并使事情变得更糟? /谢谢!
缓存如何以及为什么会降低性能?
让我们用一个简单的例子来证明:
// Some data
val df = spark.range(100)
df.join(df, Seq("id")).filter('id <20).explain(true)
在这里,催化剂计划将通过在加入之前对每个数据帧进行过滤来优化此加入,以减少将被洗牌的数据量。
== Optimized Logical Plan ==
Project [id#0L]
+- Join Inner, (id#0L = id#69L)
:- Filter (id#0L < 20)
: +- Range (0, 100, step=1, splits=Some(4))
+- Filter (id#69L < 20)
+- Range (0, 100, step=1, splits=Some(4))
如果我们在连接后缓存查询,查询将不会像我们在这里看到的那样优化:
df.join(df, Seq("id")).cache.filter('id <20).explain(true)
== Optimized Logical Plan ==
Filter (id#0L < 20)
+- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#74L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=4)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 100, step=1, splits=4)
过滤器在最后完成...
为什么会这样?因为 cache
将数据帧写入磁盘。因此,每个后续查询都将使用这个缓存/写入磁盘的 DataFrame,因此它只会优化缓存后的查询部分。我们可以用同样的例子来验证!
df.join(df, Seq("id")).cache.join(df, Seq("id")).filter('id <20).explain(true)
== Optimized Logical Plan ==
Project [id#0L]
+- Join Inner, (id#0L = id#92L)
:- Filter (id#0L < 20)
: +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
: +- *Project [id#0L]
: +- *BroadcastHashJoin [id#0L], [id#74L], Inner, BuildRight
: :- *Range (0, 100, step=1, splits=4)
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
: +- *Range (0, 100, step=1, splits=4)
+- Filter (id#92L < 20)
+- Range (0, 100, step=1, splits=Some(4))
过滤器在第二次连接之前完成,但在第一次连接之后完成,因为它已被缓存。
如何避免?
知道你在做什么!您可以简单地比较催化剂计划,看看 Spark 缺少哪些优化。