scala 中的数据集 forEach 循环抛出 SparkException 任务不可序列化

Dataset forEach loop in scala throwing SparkException Task not serializable

我的问题与 一个非常相似,只是我使用的是 Scala。

对于下面的代码:

        roleList = res.select($"results", explode($"results").as("results_flat1"))
                        .select("results_flat1.*")
                        .select(explode($"rows").as("rows"))
                        .select($"rows".getItem(0).as("x"))
                        .withColumn("y", trim(col("x")))
                        .select($"y".as("ROLE_NAME"))
                        .map(row => Role(row.getAs[String](0)))

        if (roleList.count() != 0) {
            println(s"Number of Roles = ${roleList.count()}")

            roleList.foreach{role =>
                var status = ""

                do {
                    val response = getDf()
                    response.show()

                    status = response.select("status").head().getString(0)
                    var error = ""

                    error = response.select($"results", explode($"results").as("results_flat1"))
                                .select("results_flat1.*")
                                .select($"error")
                                .first().get(0).asInstanceOf[String]
                }
                while (status != "completed")
            }
        }

我遇到以下异常:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
    at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:926)
    at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:925)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:925)
    at org.apache.spark.sql.Dataset$$anonfun$foreach.apply$mcV$sp(Dataset.scala:2716)
    at org.apache.spark.sql.Dataset$$anonfun$foreach.apply(Dataset.scala:2716)
    at org.apache.spark.sql.Dataset$$anonfun$foreach.apply(Dataset.scala:2716)
    at org.apache.spark.sql.Dataset$$anonfun$withNewRDDExecutionId.apply(Dataset.scala:3349)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:3345)
    at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2715)
    at com.cloud.operations.RoleOperations.createRoles(RoleOperations.scala:30)
    at com.cloud.Main$.main(Main.scala:24)
    at com.cloud.Main.main(Main.scala)
Caused by: java.io.NotSerializableException: com.cloud.operations.RoleOperations
Serialization stack:
    - object not serializable (class: com.cloud.operations.RoleOperations, value: com.cloud.operations.RoleOperations@67a3394c)
    - field (class: com.cloud.operations.RoleOperations$$anonfun$createRoles, name: $outer, type: class com.cloud.operations.RoleOperations)
    - object (class com.cloud.operations.RoleOperations$$anonfun$createRoles, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
    ... 21 more

RoleOperations.scala:30指的是roleList.foreach开始的行。

我不太确定为什么会这样。从链接问题的答案来看,我没有在我的代码中的任何地方使用 Spark Context,尽管 getDf() 确实使用 spark.read.json(来自 SparkSession)。即使在那种情况下,异常也不会发生在该行,而是发生在它上面的那一行,这让我很困惑。请帮忙解决这个问题。

首先,您不能在执行器上执行的函数中使用 spark 会话。 SparkSession 只能从驱动程序代码中使用。

在您的情况下,roleList.foreach 中的所有内容都在执行程序上执行,而不是在驱动程序上执行。

此外,当有人在执行程序代码中使用 class 中定义的变量时,可能会出现相同的错误。在这种情况下,整个 class 需要发送给执行者,如果它不可序列化 - 你会得到这个错误。