单个应用程序可以有多少个 SparkSession?

How many SparkSessions can a single application have?

我发现随着 Spark 运行 和表的大小(通过连接)的增长,spark 执行器最终会 运行 内存不足,整个系统崩溃。即使我尝试将临时结果写入 Hive 表(在 HDFS 上),系统仍然没有释放太多内存,并且我的整个系统在大约 130 次连接后崩溃。

但是,通过实验,我意识到如果我将问题分解成更小的部分,将临时结果写入配置单元表,Stop/Start Spark 会话(和 spark 上下文),那么系统的资源就会被释放.我能够使用这种方法加入 1,000 多个列。

但我找不到任何文档来了解这是否被认为是一种好的做法(我知道你不应该一次获得多个会话)。大多数系统在开始时获取会话并在最后关闭它。我还可以将应用程序分解成更小的应用程序,并使用像 Oozie 这样的驱动程序在 Yarn 上调度这些更小的应用程序。但是这种方法会在每个阶段启动和停止 JVM,这似乎有点重量级。

所以我的问题是:在单个 spark 应用程序的 运行 期间持续 start/stop spark 会话以释放系统资源是不好的做法吗?


但是您能否详细说明单个 JVM 上的单个 SparkContext 是什么意思?我可以调用 sparkSession.sparkContext().stop(),也可以调用 stop SparkSession。然后我创建了一个新的 SparkSession 并使用了一个新的 sparkContext。没有抛出任何错误。

我也可以在 JavaSparkPi 上使用它,没有任何问题。

我已经在 yarn-clientlocal spark 安装中对此进行了测试。

停止 spark 上下文究竟有什么作用,为什么在停止后不能创建一个新的?

TL;DR 您可以根据需要拥有任意数量的 SparkSession

您可以在单个 JVM 上有一个且只有一个 SparkContext,但是 SparkSession 的数量几乎没有限制。

But can you elaborate on what you mean by a single SparkContext on a single JVM?

这意味着在 Spark 应用程序的生命周期中的任何给定时间,驱动程序只能是一个且只有一个,这反过来意味着该 JVM 上只有一个 SparkContext 可用。

Spark 应用程序的驱动程序是 SparkContext 所在的位置(或者相反,而不是 SparkContext 定义驱动程序的位置——区别非常模糊)。

您一次只能拥有一个 SparkContext。虽然你可以根据需要多次启动和停止它,但我记得一个关于它的问题说你不应该关闭 SparkContext 除非你完成了 Spark(这通常发生在你的最后Spark 应用程序)。

换句话说,在您的 Spark 应用程序的整个生命周期中只有一个 SparkContext

有一个类似的问题 关于多个 SparkSession 可以更清楚地说明为什么你想要有两个或更多会话。

I was able call sparkSession.sparkContext().stop(), and also stop the SparkSession.

所以?!这怎么和我说的矛盾了?!您停止了 JVM 上唯一可用的 SparkContext。没什么大不了的。可以,但这只是 "you can only have one and only one SparkContext on a single JVM available" 的一部分,不是吗?

SparkSession 只是 SparkContext 的包装器,用于在 Spark Core 的 RDD 之上提供 Spark SQL 的 structured/SQL 功能。

从 Spark SQL 开发人员的角度来看,SparkSession 的目的是成为查询实体(例如您的查询使用的 table、视图或函数的命名空间(作为数据帧、数据集或 SQL) 和 Spark 属性(每个 SparkSession 可能具有不同的值)。

如果您希望将相同的(临时)table 名称用于不同的数据集,创建两个 SparkSession 将是我认为推荐的方式。

我刚刚制作了一个示例来展示全阶段代码生成在 Spark 中的工作方式 SQL 并创建了以下内容以简单地关闭该功能。

// both where and select operators support whole-stage codegen
// the plan tree (with the operators and expressions) meets the requirements
// That's why the plan has WholeStageCodegenExec inserted
// You can see stars (*) in the output of explain
val q = Seq((1,2,3)).toDF("id", "c0", "c1").where('id === 0).select('c0)
scala> q.explain
== Physical Plan ==
*Project [_2#89 AS c0#93]
+- *Filter (_1#88 = 0)
   +- LocalTableScan [_1#88, _2#89, _3#90]

// Let's break the requirement of having up to spark.sql.codegen.maxFields
// I'm creating a brand new SparkSession with one property changed
val newSpark = spark.newSession()
import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_MAX_NUM_FIELDS
newSpark.sessionState.conf.setConf(WHOLESTAGE_MAX_NUM_FIELDS, 2)

scala> println(newSpark.sessionState.conf.wholeStageMaxNumFields)
2

// Let's see what's the initial value is
// Note that I use spark value (not newSpark)
scala> println(spark.sessionState.conf.wholeStageMaxNumFields)
100

import newSpark.implicits._
// the same query as above but created in SparkSession with WHOLESTAGE_MAX_NUM_FIELDS as 2
val q = Seq((1,2,3)).toDF("id", "c0", "c1").where('id === 0).select('c0)

// Note that there are no stars in the output of explain
// No WholeStageCodegenExec operator in the plan => whole-stage codegen disabled
scala> q.explain
== Physical Plan ==
Project [_2#122 AS c0#126]
+- Filter (_1#121 = 0)
   +- LocalTableScan [_1#121, _2#122, _3#123]

I then created a new SparkSession and used a new SparkContext. No error was thrown.

同样,这与我所说的关于单个 SparkContext 可用的说法有何矛盾?我很好奇。

What exactly does stopping the spark context do, and why can you not create a new one once you've stopped one?

您不能再将它用于 运行 Spark 作业(处理大型和分布式数据集),这几乎正是您首先使用 Spark 的原因,不是吗?

尝试以下操作:

  1. 停止SparkContext
  2. 使用 Spark Core 的 RDD 或 Spark SQL 的数据集 API 执行任何处理

异常?正确的!请记住,您已将 "doors" 关闭到 Spark,所以您怎么能想到会在里面呢?! :)