单个应用程序可以有多少个 SparkSession?
How many SparkSessions can a single application have?
我发现随着 Spark 运行 和表的大小(通过连接)的增长,spark 执行器最终会 运行 内存不足,整个系统崩溃。即使我尝试将临时结果写入 Hive 表(在 HDFS 上),系统仍然没有释放太多内存,并且我的整个系统在大约 130 次连接后崩溃。
但是,通过实验,我意识到如果我将问题分解成更小的部分,将临时结果写入配置单元表,Stop/Start Spark 会话(和 spark 上下文),那么系统的资源就会被释放.我能够使用这种方法加入 1,000 多个列。
但我找不到任何文档来了解这是否被认为是一种好的做法(我知道你不应该一次获得多个会话)。大多数系统在开始时获取会话并在最后关闭它。我还可以将应用程序分解成更小的应用程序,并使用像 Oozie 这样的驱动程序在 Yarn 上调度这些更小的应用程序。但是这种方法会在每个阶段启动和停止 JVM,这似乎有点重量级。
所以我的问题是:在单个 spark 应用程序的 运行 期间持续 start/stop spark 会话以释放系统资源是不好的做法吗?
但是您能否详细说明单个 JVM 上的单个 SparkContext 是什么意思?我可以调用 sparkSession.sparkContext().stop()
,也可以调用 stop
SparkSession
。然后我创建了一个新的 SparkSession
并使用了一个新的 sparkContext
。没有抛出任何错误。
我也可以在 JavaSparkPi
上使用它,没有任何问题。
我已经在 yarn-client
和 local
spark 安装中对此进行了测试。
停止 spark 上下文究竟有什么作用,为什么在停止后不能创建一个新的?
TL;DR 您可以根据需要拥有任意数量的 SparkSession
。
您可以在单个 JVM 上有一个且只有一个 SparkContext
,但是 SparkSession
的数量几乎没有限制。
But can you elaborate on what you mean by a single SparkContext on a single JVM?
这意味着在 Spark 应用程序的生命周期中的任何给定时间,驱动程序只能是一个且只有一个,这反过来意味着该 JVM 上只有一个 SparkContext
可用。
Spark 应用程序的驱动程序是 SparkContext
所在的位置(或者相反,而不是 SparkContext
定义驱动程序的位置——区别非常模糊)。
您一次只能拥有一个 SparkContext
。虽然你可以根据需要多次启动和停止它,但我记得一个关于它的问题说你不应该关闭 SparkContext
除非你完成了 Spark(这通常发生在你的最后Spark 应用程序)。
换句话说,在您的 Spark 应用程序的整个生命周期中只有一个 SparkContext
。
有一个类似的问题 关于多个 SparkSession
可以更清楚地说明为什么你想要有两个或更多会话。
I was able call sparkSession.sparkContext().stop()
, and also stop
the SparkSession
.
所以?!这怎么和我说的矛盾了?!您停止了 JVM 上唯一可用的 SparkContext
。没什么大不了的。可以,但这只是 "you can only have one and only one SparkContext
on a single JVM available" 的一部分,不是吗?
SparkSession
只是 SparkContext
的包装器,用于在 Spark Core 的 RDD 之上提供 Spark SQL 的 structured/SQL 功能。
从 Spark SQL 开发人员的角度来看,SparkSession
的目的是成为查询实体(例如您的查询使用的 table、视图或函数的命名空间(作为数据帧、数据集或 SQL) 和 Spark 属性(每个 SparkSession
可能具有不同的值)。
如果您希望将相同的(临时)table 名称用于不同的数据集,创建两个 SparkSession
将是我认为推荐的方式。
我刚刚制作了一个示例来展示全阶段代码生成在 Spark 中的工作方式 SQL 并创建了以下内容以简单地关闭该功能。
// both where and select operators support whole-stage codegen
// the plan tree (with the operators and expressions) meets the requirements
// That's why the plan has WholeStageCodegenExec inserted
// You can see stars (*) in the output of explain
val q = Seq((1,2,3)).toDF("id", "c0", "c1").where('id === 0).select('c0)
scala> q.explain
== Physical Plan ==
*Project [_2#89 AS c0#93]
+- *Filter (_1#88 = 0)
+- LocalTableScan [_1#88, _2#89, _3#90]
// Let's break the requirement of having up to spark.sql.codegen.maxFields
// I'm creating a brand new SparkSession with one property changed
val newSpark = spark.newSession()
import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_MAX_NUM_FIELDS
newSpark.sessionState.conf.setConf(WHOLESTAGE_MAX_NUM_FIELDS, 2)
scala> println(newSpark.sessionState.conf.wholeStageMaxNumFields)
2
// Let's see what's the initial value is
// Note that I use spark value (not newSpark)
scala> println(spark.sessionState.conf.wholeStageMaxNumFields)
100
import newSpark.implicits._
// the same query as above but created in SparkSession with WHOLESTAGE_MAX_NUM_FIELDS as 2
val q = Seq((1,2,3)).toDF("id", "c0", "c1").where('id === 0).select('c0)
// Note that there are no stars in the output of explain
// No WholeStageCodegenExec operator in the plan => whole-stage codegen disabled
scala> q.explain
== Physical Plan ==
Project [_2#122 AS c0#126]
+- Filter (_1#121 = 0)
+- LocalTableScan [_1#121, _2#122, _3#123]
I then created a new SparkSession
and used a new SparkContext
. No error was thrown.
同样,这与我所说的关于单个 SparkContext
可用的说法有何矛盾?我很好奇。
What exactly does stopping the spark context do, and why can you not create a new one once you've stopped one?
您不能再将它用于 运行 Spark 作业(处理大型和分布式数据集),这几乎正是您首先使用 Spark 的原因,不是吗?
尝试以下操作:
- 停止
SparkContext
- 使用 Spark Core 的 RDD 或 Spark SQL 的数据集 API 执行任何处理
异常?正确的!请记住,您已将 "doors" 关闭到 Spark,所以您怎么能想到会在里面呢?! :)
我发现随着 Spark 运行 和表的大小(通过连接)的增长,spark 执行器最终会 运行 内存不足,整个系统崩溃。即使我尝试将临时结果写入 Hive 表(在 HDFS 上),系统仍然没有释放太多内存,并且我的整个系统在大约 130 次连接后崩溃。
但是,通过实验,我意识到如果我将问题分解成更小的部分,将临时结果写入配置单元表,Stop/Start Spark 会话(和 spark 上下文),那么系统的资源就会被释放.我能够使用这种方法加入 1,000 多个列。
但我找不到任何文档来了解这是否被认为是一种好的做法(我知道你不应该一次获得多个会话)。大多数系统在开始时获取会话并在最后关闭它。我还可以将应用程序分解成更小的应用程序,并使用像 Oozie 这样的驱动程序在 Yarn 上调度这些更小的应用程序。但是这种方法会在每个阶段启动和停止 JVM,这似乎有点重量级。
所以我的问题是:在单个 spark 应用程序的 运行 期间持续 start/stop spark 会话以释放系统资源是不好的做法吗?
但是您能否详细说明单个 JVM 上的单个 SparkContext 是什么意思?我可以调用 sparkSession.sparkContext().stop()
,也可以调用 stop
SparkSession
。然后我创建了一个新的 SparkSession
并使用了一个新的 sparkContext
。没有抛出任何错误。
我也可以在 JavaSparkPi
上使用它,没有任何问题。
我已经在 yarn-client
和 local
spark 安装中对此进行了测试。
停止 spark 上下文究竟有什么作用,为什么在停止后不能创建一个新的?
TL;DR 您可以根据需要拥有任意数量的 SparkSession
。
您可以在单个 JVM 上有一个且只有一个 SparkContext
,但是 SparkSession
的数量几乎没有限制。
But can you elaborate on what you mean by a single SparkContext on a single JVM?
这意味着在 Spark 应用程序的生命周期中的任何给定时间,驱动程序只能是一个且只有一个,这反过来意味着该 JVM 上只有一个 SparkContext
可用。
Spark 应用程序的驱动程序是 SparkContext
所在的位置(或者相反,而不是 SparkContext
定义驱动程序的位置——区别非常模糊)。
您一次只能拥有一个 SparkContext
。虽然你可以根据需要多次启动和停止它,但我记得一个关于它的问题说你不应该关闭 SparkContext
除非你完成了 Spark(这通常发生在你的最后Spark 应用程序)。
换句话说,在您的 Spark 应用程序的整个生命周期中只有一个 SparkContext
。
有一个类似的问题 SparkSession
可以更清楚地说明为什么你想要有两个或更多会话。
I was able call
sparkSession.sparkContext().stop()
, and alsostop
theSparkSession
.
所以?!这怎么和我说的矛盾了?!您停止了 JVM 上唯一可用的 SparkContext
。没什么大不了的。可以,但这只是 "you can only have one and only one SparkContext
on a single JVM available" 的一部分,不是吗?
SparkSession
只是 SparkContext
的包装器,用于在 Spark Core 的 RDD 之上提供 Spark SQL 的 structured/SQL 功能。
从 Spark SQL 开发人员的角度来看,SparkSession
的目的是成为查询实体(例如您的查询使用的 table、视图或函数的命名空间(作为数据帧、数据集或 SQL) 和 Spark 属性(每个 SparkSession
可能具有不同的值)。
如果您希望将相同的(临时)table 名称用于不同的数据集,创建两个 SparkSession
将是我认为推荐的方式。
我刚刚制作了一个示例来展示全阶段代码生成在 Spark 中的工作方式 SQL 并创建了以下内容以简单地关闭该功能。
// both where and select operators support whole-stage codegen
// the plan tree (with the operators and expressions) meets the requirements
// That's why the plan has WholeStageCodegenExec inserted
// You can see stars (*) in the output of explain
val q = Seq((1,2,3)).toDF("id", "c0", "c1").where('id === 0).select('c0)
scala> q.explain
== Physical Plan ==
*Project [_2#89 AS c0#93]
+- *Filter (_1#88 = 0)
+- LocalTableScan [_1#88, _2#89, _3#90]
// Let's break the requirement of having up to spark.sql.codegen.maxFields
// I'm creating a brand new SparkSession with one property changed
val newSpark = spark.newSession()
import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_MAX_NUM_FIELDS
newSpark.sessionState.conf.setConf(WHOLESTAGE_MAX_NUM_FIELDS, 2)
scala> println(newSpark.sessionState.conf.wholeStageMaxNumFields)
2
// Let's see what's the initial value is
// Note that I use spark value (not newSpark)
scala> println(spark.sessionState.conf.wholeStageMaxNumFields)
100
import newSpark.implicits._
// the same query as above but created in SparkSession with WHOLESTAGE_MAX_NUM_FIELDS as 2
val q = Seq((1,2,3)).toDF("id", "c0", "c1").where('id === 0).select('c0)
// Note that there are no stars in the output of explain
// No WholeStageCodegenExec operator in the plan => whole-stage codegen disabled
scala> q.explain
== Physical Plan ==
Project [_2#122 AS c0#126]
+- Filter (_1#121 = 0)
+- LocalTableScan [_1#121, _2#122, _3#123]
I then created a new
SparkSession
and used a newSparkContext
. No error was thrown.
同样,这与我所说的关于单个 SparkContext
可用的说法有何矛盾?我很好奇。
What exactly does stopping the spark context do, and why can you not create a new one once you've stopped one?
您不能再将它用于 运行 Spark 作业(处理大型和分布式数据集),这几乎正是您首先使用 Spark 的原因,不是吗?
尝试以下操作:
- 停止
SparkContext
- 使用 Spark Core 的 RDD 或 Spark SQL 的数据集 API 执行任何处理
异常?正确的!请记住,您已将 "doors" 关闭到 Spark,所以您怎么能想到会在里面呢?! :)