访问广播变量时出现 Spark UDF 异常
Spark UDF exception when accessing broadcast variable
我无法从 Spark UDF 内部访问 scala.collection.immutable.Map
。
我正在广播地图
val browserLangMap = sc.broadcast (Source.fromFile(browserLangFilePath).getLines.map(_.split(,)).map(e => (e(0).toInt,e(1))).toMap)
创建访问地图的 UDF
def addBrowserCode = udf((browserLang:Int) => if(browserLangMap.value.contains(browserLang)) browserLangMap.value(browserLang) else "")`
使用 UDF 添加新列
val joinedDF = rawDF.join(broadcast(geoDF).as("GEO"), $"start_ip" === $"GEO.start_ip_num", "left_outer")
.withColumn("browser_code", addBrowserCode($"browser_language"))
.selectExpr(getSelectQuery:_*)
完整堆栈跟踪 --> https://www.dropbox.com/s/p1d5322fo9cxro6/stack_trace.txt?dl=0
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$MetaDataSchema$
Serialization stack:
- object not serializable (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$MetaDataSchema$, value: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$MetaDataSchema$@30b4ba52)
- field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: MetaDataSchema$module, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$MetaDataSchema$)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(browser_language#235))
- field (class: org.apache.spark.sql.catalyst.expressions.If, name: falseValue, type: class org.apache.spark.sql.catalyst.expressions.Expression)
- object (class org.apache.spark.sql.catalyst.expressions.If, if (isnull(browser_language#235)) null else UDF(browser_language#235))
- field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: child, type: class org.apache.spark.sql.catalyst.expressions.Expression)
- object (class org.apache.spark.sql.catalyst.expressions.Alias, if (isnull(browser_language#235)) null else UDF(browser_language#235) AS browser_language#507)
- object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@5ae38c4e)
- writeObject data (class: scala.collection.immutable.$colon$colon)
- object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@5ae38c4e))
- field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 80 more
我知道这是访问广播地图导致的。当我在 UDF 中删除对它的引用时,也不例外。
def addBrowserCode = udf((browserLang:Int) => browserLang.toString()) //Test UDF without accessing broadcast Map and it works
Spark 版本 1.6
我发现这是在 spark shell 中使用“:paste”的奇怪行为。仅当我使用 :paste.
将整个代码粘贴到一个多行粘贴中时才会发生这种情况
如果我先粘贴广播和 UDF 创建,然后将 join+saveToFile 粘贴到单独的 :paste 中,同样的代码可以完美运行。
可能是 scala shell 问题。我不知道。
根本原因与在代码中为广播变量声明 val sc: SparkContext = spark.sparkContext
有关。如果代码在 spark-shell 上运行,默认情况下 sc 已经可用。声明 sc 两次(一次默认,一次在代码中)导致这个 "Task not serializable" 问题。因此,与之前声称的答案不同,spark-shell 没有问题。只需在spark-shell中暂时去掉SparkContext声明,代码就OK了。
我无法从 Spark UDF 内部访问 scala.collection.immutable.Map
。
我正在广播地图
val browserLangMap = sc.broadcast (Source.fromFile(browserLangFilePath).getLines.map(_.split(,)).map(e => (e(0).toInt,e(1))).toMap)
创建访问地图的 UDF
def addBrowserCode = udf((browserLang:Int) => if(browserLangMap.value.contains(browserLang)) browserLangMap.value(browserLang) else "")`
使用 UDF 添加新列
val joinedDF = rawDF.join(broadcast(geoDF).as("GEO"), $"start_ip" === $"GEO.start_ip_num", "left_outer")
.withColumn("browser_code", addBrowserCode($"browser_language"))
.selectExpr(getSelectQuery:_*)
完整堆栈跟踪 --> https://www.dropbox.com/s/p1d5322fo9cxro6/stack_trace.txt?dl=0
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$MetaDataSchema$
Serialization stack:
- object not serializable (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$MetaDataSchema$, value: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$MetaDataSchema$@30b4ba52)
- field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: MetaDataSchema$module, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$MetaDataSchema$)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(browser_language#235))
- field (class: org.apache.spark.sql.catalyst.expressions.If, name: falseValue, type: class org.apache.spark.sql.catalyst.expressions.Expression)
- object (class org.apache.spark.sql.catalyst.expressions.If, if (isnull(browser_language#235)) null else UDF(browser_language#235))
- field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: child, type: class org.apache.spark.sql.catalyst.expressions.Expression)
- object (class org.apache.spark.sql.catalyst.expressions.Alias, if (isnull(browser_language#235)) null else UDF(browser_language#235) AS browser_language#507)
- object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@5ae38c4e)
- writeObject data (class: scala.collection.immutable.$colon$colon)
- object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@5ae38c4e))
- field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 80 more
我知道这是访问广播地图导致的。当我在 UDF 中删除对它的引用时,也不例外。
def addBrowserCode = udf((browserLang:Int) => browserLang.toString()) //Test UDF without accessing broadcast Map and it works
Spark 版本 1.6
我发现这是在 spark shell 中使用“:paste”的奇怪行为。仅当我使用 :paste.
将整个代码粘贴到一个多行粘贴中时才会发生这种情况如果我先粘贴广播和 UDF 创建,然后将 join+saveToFile 粘贴到单独的 :paste 中,同样的代码可以完美运行。
可能是 scala shell 问题。我不知道。
根本原因与在代码中为广播变量声明 val sc: SparkContext = spark.sparkContext
有关。如果代码在 spark-shell 上运行,默认情况下 sc 已经可用。声明 sc 两次(一次默认,一次在代码中)导致这个 "Task not serializable" 问题。因此,与之前声称的答案不同,spark-shell 没有问题。只需在spark-shell中暂时去掉SparkContext声明,代码就OK了。