使用 Spark Session 的单元测试:SparkContext 已关闭
Unit Tests using Spark Session : SparkContext was shut down
我们有一个包含多个测试套件的大项目,每个测试套件平均有 3 个测试。
对于我们的单元测试,我们使用 Spark Standalone,因此没有 Yarn 作为资源管理器。
每个测试套件:
启动 spark 会话:
implicit val spark = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
扩展 BeforeAndAfterAll
:
class MyTestsSpec extends WordSpec
with Matchers
with BeforeAndAfterAll {
...
}
并重新定义 afterAll :
override def afterAll: Unit = {
try {
spark.stop()
} finally {
super.afterAll
}
}
我们的解决方案在 Jenkins 中有一个 CI 作业,由于以下错误导致测试失败,Jenkins 作业开始变得如此频繁 不稳定 :
Message d'erreur
Job 9 cancelled because SparkContext was shut down
Pile d'exécution
org.apache.spark.SparkException: Job 9 cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop.apply(DAGScheduler.scala:820)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop.apply(DAGScheduler.scala:818)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:818)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1732)
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:83)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1651)
at org.apache.spark.SparkContext$$anonfun$stop.apply$mcV$sp(SparkContext.scala:1921)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1317)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1920)
at org.apache.spark.SparkContext$$anonfun.apply$mcV$sp(SparkContext.scala:581)
at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$$anonfun$apply$mcV$sp.apply$mcV$sp(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$$anonfun$apply$mcV$sp.apply(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$$anonfun$apply$mcV$sp.apply(ShutdownHookManager.scala:188)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll.apply$mcV$sp(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll.apply(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll.apply(ShutdownHookManager.scala:188)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anon.run(ShutdownHookManager.scala:178)
at org.apache.hadoop.util.ShutdownHookManager.run(ShutdownHookManager.java:54)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853)
at org.apache.spark.sql.Dataset$$anonfun$collect.apply(Dataset.scala:2390)
at org.apache.spark.sql.Dataset$$anonfun$collect.apply(Dataset.scala:2390)
at org.apache.spark.sql.Dataset$$anonfun.apply(Dataset.scala:2837)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2390)
// some business classes
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.WordSpecLike$$anon.apply(WordSpecLike.scala:1078)
at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
at org.scalatest.WordSpec.withFixture(WordSpec.scala:1881)
at org.scalatest.WordSpecLike$class.invokeWithFixture(WordSpecLike.scala:1075)
at org.scalatest.WordSpecLike$$anonfun$runTest.apply(WordSpecLike.scala:1088)
at org.scalatest.WordSpecLike$$anonfun$runTest.apply(WordSpecLike.scala:1088)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
at org.scalatest.WordSpecLike$class.runTest(WordSpecLike.scala:1088)
at org.scalatest.WordSpec.runTest(WordSpec.scala:1881)
at org.scalatest.WordSpecLike$$anonfun$runTests.apply(WordSpecLike.scala:1147)
at org.scalatest.WordSpecLike$$anonfun$runTests.apply(WordSpecLike.scala:1147)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes.apply(Engine.scala:396)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes.apply(Engine.scala:384)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes(Engine.scala:384)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:373)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes.apply(Engine.scala:410)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes.apply(Engine.scala:384)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes(Engine.scala:384)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
at org.scalatest.WordSpecLike$class.runTests(WordSpecLike.scala:1147)
at org.scalatest.WordSpec.runTests(WordSpec.scala:1881)
at org.scalatest.Suite$class.run(Suite.scala:1147)
at org.scalatest.WordSpec.org$scalatest$WordSpecLike$$super$run(WordSpec.scala:1881)
at org.scalatest.WordSpecLike$$anonfun$run.apply(WordSpecLike.scala:1192)
at org.scalatest.WordSpecLike$$anonfun$run.apply(WordSpecLike.scala:1192)
at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
at org.scalatest.WordSpecLike$class.run(WordSpecLike.scala:1192)
// some business classes
at org.scalatest.BeforeAndAfterAll$class.liftedTree1(BeforeAndAfterAll.scala:213)
at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
// some business classes
at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
at sbt.TestRunner.runTest(TestFramework.scala:106)
at sbt.TestRunner.run(TestFramework.scala:117)
at sbt.TestFramework$$anon$$anonfun$$lessinit$greater.$anonfun$apply(TestFramework.scala:262)
at sbt.TestFramework$.sbt$TestFramework$$withContextLoader(TestFramework.scala:233)
at sbt.TestFramework$$anon$$anonfun$$lessinit$greater.apply(TestFramework.scala:262)
at sbt.TestFramework$$anon$$anonfun$$lessinit$greater.apply(TestFramework.scala:262)
at sbt.TestFunction.apply(TestFramework.scala:271)
at sbt.Tests$.processRunnable(Tests.scala:307)
at sbt.Tests$.$anonfun$makeSerial(Tests.scala:313)
at sbt.std.Transform$$anon.$anonfun$apply(System.scala:46)
at sbt.std.Transform$$anon.work(System.scala:66)
at sbt.Execute.$anonfun$submit(Execute.scala:262)
at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:16)
at sbt.Execute.work(Execute.scala:271)
at sbt.Execute.$anonfun$submit(Execute.scala:262)
at sbt.ConcurrentRestrictions$$anon.$anonfun$submitValid(ConcurrentRestrictions.scala:174)
at sbt.CompletionService$$anon.call(CompletionService.scala:36)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
当我们运行单独测试时,它成功了,没问题。
我遇到了类似的问题。
我使用 Spark 进行了多次测试,但只有第一个套件有效。我在所有这些中都调用了 spark.close()
。从它工作的所有套件中删除此调用后。
调查 SparkSession code 后,我的结论是,由于每个 JVM 只能有一个 SparkContext
,并且测试在同一个 JVM 上进行 运行,当您停止它时第一次,它变得无法使用 "JVM session"。
我们有一个包含多个测试套件的大项目,每个测试套件平均有 3 个测试。
对于我们的单元测试,我们使用 Spark Standalone,因此没有 Yarn 作为资源管理器。 每个测试套件:
启动 spark 会话:
implicit val spark = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
扩展 BeforeAndAfterAll
:
class MyTestsSpec extends WordSpec
with Matchers
with BeforeAndAfterAll {
...
}
并重新定义 afterAll :
override def afterAll: Unit = {
try {
spark.stop()
} finally {
super.afterAll
}
}
我们的解决方案在 Jenkins 中有一个 CI 作业,由于以下错误导致测试失败,Jenkins 作业开始变得如此频繁 不稳定 :
Message d'erreur
Job 9 cancelled because SparkContext was shut down
Pile d'exécution
org.apache.spark.SparkException: Job 9 cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop.apply(DAGScheduler.scala:820)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop.apply(DAGScheduler.scala:818)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:818)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1732)
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:83)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1651)
at org.apache.spark.SparkContext$$anonfun$stop.apply$mcV$sp(SparkContext.scala:1921)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1317)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1920)
at org.apache.spark.SparkContext$$anonfun.apply$mcV$sp(SparkContext.scala:581)
at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$$anonfun$apply$mcV$sp.apply$mcV$sp(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$$anonfun$apply$mcV$sp.apply(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$$anonfun$apply$mcV$sp.apply(ShutdownHookManager.scala:188)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll.apply$mcV$sp(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll.apply(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll.apply(ShutdownHookManager.scala:188)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anon.run(ShutdownHookManager.scala:178)
at org.apache.hadoop.util.ShutdownHookManager.run(ShutdownHookManager.java:54)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853)
at org.apache.spark.sql.Dataset$$anonfun$collect.apply(Dataset.scala:2390)
at org.apache.spark.sql.Dataset$$anonfun$collect.apply(Dataset.scala:2390)
at org.apache.spark.sql.Dataset$$anonfun.apply(Dataset.scala:2837)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2390)
// some business classes
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.WordSpecLike$$anon.apply(WordSpecLike.scala:1078)
at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
at org.scalatest.WordSpec.withFixture(WordSpec.scala:1881)
at org.scalatest.WordSpecLike$class.invokeWithFixture(WordSpecLike.scala:1075)
at org.scalatest.WordSpecLike$$anonfun$runTest.apply(WordSpecLike.scala:1088)
at org.scalatest.WordSpecLike$$anonfun$runTest.apply(WordSpecLike.scala:1088)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
at org.scalatest.WordSpecLike$class.runTest(WordSpecLike.scala:1088)
at org.scalatest.WordSpec.runTest(WordSpec.scala:1881)
at org.scalatest.WordSpecLike$$anonfun$runTests.apply(WordSpecLike.scala:1147)
at org.scalatest.WordSpecLike$$anonfun$runTests.apply(WordSpecLike.scala:1147)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes.apply(Engine.scala:396)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes.apply(Engine.scala:384)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes(Engine.scala:384)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:373)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes.apply(Engine.scala:410)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes.apply(Engine.scala:384)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes(Engine.scala:384)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
at org.scalatest.WordSpecLike$class.runTests(WordSpecLike.scala:1147)
at org.scalatest.WordSpec.runTests(WordSpec.scala:1881)
at org.scalatest.Suite$class.run(Suite.scala:1147)
at org.scalatest.WordSpec.org$scalatest$WordSpecLike$$super$run(WordSpec.scala:1881)
at org.scalatest.WordSpecLike$$anonfun$run.apply(WordSpecLike.scala:1192)
at org.scalatest.WordSpecLike$$anonfun$run.apply(WordSpecLike.scala:1192)
at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
at org.scalatest.WordSpecLike$class.run(WordSpecLike.scala:1192)
// some business classes
at org.scalatest.BeforeAndAfterAll$class.liftedTree1(BeforeAndAfterAll.scala:213)
at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
// some business classes
at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
at sbt.TestRunner.runTest(TestFramework.scala:106)
at sbt.TestRunner.run(TestFramework.scala:117)
at sbt.TestFramework$$anon$$anonfun$$lessinit$greater.$anonfun$apply(TestFramework.scala:262)
at sbt.TestFramework$.sbt$TestFramework$$withContextLoader(TestFramework.scala:233)
at sbt.TestFramework$$anon$$anonfun$$lessinit$greater.apply(TestFramework.scala:262)
at sbt.TestFramework$$anon$$anonfun$$lessinit$greater.apply(TestFramework.scala:262)
at sbt.TestFunction.apply(TestFramework.scala:271)
at sbt.Tests$.processRunnable(Tests.scala:307)
at sbt.Tests$.$anonfun$makeSerial(Tests.scala:313)
at sbt.std.Transform$$anon.$anonfun$apply(System.scala:46)
at sbt.std.Transform$$anon.work(System.scala:66)
at sbt.Execute.$anonfun$submit(Execute.scala:262)
at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:16)
at sbt.Execute.work(Execute.scala:271)
at sbt.Execute.$anonfun$submit(Execute.scala:262)
at sbt.ConcurrentRestrictions$$anon.$anonfun$submitValid(ConcurrentRestrictions.scala:174)
at sbt.CompletionService$$anon.call(CompletionService.scala:36)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
当我们运行单独测试时,它成功了,没问题。
我遇到了类似的问题。
我使用 Spark 进行了多次测试,但只有第一个套件有效。我在所有这些中都调用了 spark.close()
。从它工作的所有套件中删除此调用后。
调查 SparkSession code 后,我的结论是,由于每个 JVM 只能有一个 SparkContext
,并且测试在同一个 JVM 上进行 运行,当您停止它时第一次,它变得无法使用 "JVM session"。