scala 中的数据集 forEach 循环抛出 SparkException 任务不可序列化
Dataset forEach loop in scala throwing SparkException Task not serializable
我的问题与 一个非常相似,只是我使用的是 Scala。
对于下面的代码:
roleList = res.select($"results", explode($"results").as("results_flat1"))
.select("results_flat1.*")
.select(explode($"rows").as("rows"))
.select($"rows".getItem(0).as("x"))
.withColumn("y", trim(col("x")))
.select($"y".as("ROLE_NAME"))
.map(row => Role(row.getAs[String](0)))
if (roleList.count() != 0) {
println(s"Number of Roles = ${roleList.count()}")
roleList.foreach{role =>
var status = ""
do {
val response = getDf()
response.show()
status = response.select("status").head().getString(0)
var error = ""
error = response.select($"results", explode($"results").as("results_flat1"))
.select("results_flat1.*")
.select($"error")
.first().get(0).asInstanceOf[String]
}
while (status != "completed")
}
}
我遇到以下异常:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:925)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:925)
at org.apache.spark.sql.Dataset$$anonfun$foreach.apply$mcV$sp(Dataset.scala:2716)
at org.apache.spark.sql.Dataset$$anonfun$foreach.apply(Dataset.scala:2716)
at org.apache.spark.sql.Dataset$$anonfun$foreach.apply(Dataset.scala:2716)
at org.apache.spark.sql.Dataset$$anonfun$withNewRDDExecutionId.apply(Dataset.scala:3349)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:3345)
at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2715)
at com.cloud.operations.RoleOperations.createRoles(RoleOperations.scala:30)
at com.cloud.Main$.main(Main.scala:24)
at com.cloud.Main.main(Main.scala)
Caused by: java.io.NotSerializableException: com.cloud.operations.RoleOperations
Serialization stack:
- object not serializable (class: com.cloud.operations.RoleOperations, value: com.cloud.operations.RoleOperations@67a3394c)
- field (class: com.cloud.operations.RoleOperations$$anonfun$createRoles, name: $outer, type: class com.cloud.operations.RoleOperations)
- object (class com.cloud.operations.RoleOperations$$anonfun$createRoles, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 21 more
RoleOperations.scala:30
指的是roleList.foreach
开始的行。
我不太确定为什么会这样。从链接问题的答案来看,我没有在我的代码中的任何地方使用 Spark Context,尽管 getDf()
确实使用 spark.read.json
(来自 SparkSession
)。即使在那种情况下,异常也不会发生在该行,而是发生在它上面的那一行,这让我很困惑。请帮忙解决这个问题。
首先,您不能在执行器上执行的函数中使用 spark 会话。 SparkSession
只能从驱动程序代码中使用。
在您的情况下,roleList.foreach
中的所有内容都在执行程序上执行,而不是在驱动程序上执行。
此外,当有人在执行程序代码中使用 class 中定义的变量时,可能会出现相同的错误。在这种情况下,整个 class 需要发送给执行者,如果它不可序列化 - 你会得到这个错误。
我的问题与
对于下面的代码:
roleList = res.select($"results", explode($"results").as("results_flat1"))
.select("results_flat1.*")
.select(explode($"rows").as("rows"))
.select($"rows".getItem(0).as("x"))
.withColumn("y", trim(col("x")))
.select($"y".as("ROLE_NAME"))
.map(row => Role(row.getAs[String](0)))
if (roleList.count() != 0) {
println(s"Number of Roles = ${roleList.count()}")
roleList.foreach{role =>
var status = ""
do {
val response = getDf()
response.show()
status = response.select("status").head().getString(0)
var error = ""
error = response.select($"results", explode($"results").as("results_flat1"))
.select("results_flat1.*")
.select($"error")
.first().get(0).asInstanceOf[String]
}
while (status != "completed")
}
}
我遇到以下异常:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:925)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:925)
at org.apache.spark.sql.Dataset$$anonfun$foreach.apply$mcV$sp(Dataset.scala:2716)
at org.apache.spark.sql.Dataset$$anonfun$foreach.apply(Dataset.scala:2716)
at org.apache.spark.sql.Dataset$$anonfun$foreach.apply(Dataset.scala:2716)
at org.apache.spark.sql.Dataset$$anonfun$withNewRDDExecutionId.apply(Dataset.scala:3349)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:3345)
at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2715)
at com.cloud.operations.RoleOperations.createRoles(RoleOperations.scala:30)
at com.cloud.Main$.main(Main.scala:24)
at com.cloud.Main.main(Main.scala)
Caused by: java.io.NotSerializableException: com.cloud.operations.RoleOperations
Serialization stack:
- object not serializable (class: com.cloud.operations.RoleOperations, value: com.cloud.operations.RoleOperations@67a3394c)
- field (class: com.cloud.operations.RoleOperations$$anonfun$createRoles, name: $outer, type: class com.cloud.operations.RoleOperations)
- object (class com.cloud.operations.RoleOperations$$anonfun$createRoles, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 21 more
RoleOperations.scala:30
指的是roleList.foreach
开始的行。
我不太确定为什么会这样。从链接问题的答案来看,我没有在我的代码中的任何地方使用 Spark Context,尽管 getDf()
确实使用 spark.read.json
(来自 SparkSession
)。即使在那种情况下,异常也不会发生在该行,而是发生在它上面的那一行,这让我很困惑。请帮忙解决这个问题。
首先,您不能在执行器上执行的函数中使用 spark 会话。 SparkSession
只能从驱动程序代码中使用。
在您的情况下,roleList.foreach
中的所有内容都在执行程序上执行,而不是在驱动程序上执行。
此外,当有人在执行程序代码中使用 class 中定义的变量时,可能会出现相同的错误。在这种情况下,整个 class 需要发送给执行者,如果它不可序列化 - 你会得到这个错误。