Apache Spark:Scala XML 转换不可序列化?
Apache Spark: Scala XML transformation not serializable?
我有一堆带有 DTD 的 xml header,我正在尝试加载所有这些文件,忽略 DTD。
val input = sc.wholeTextFiles("""\path\*.nxml""")
val saxfac = SAXParserFactory.newInstance();
saxfac.setValidating(false);
saxfac.setFeature("http://xml.org/sax/features/validation", false);
saxfac.setFeature("http://apache.org/xml/features/nonvalidating/load-dtd-grammar", false);
saxfac.setFeature("http://apache.org/xml/features/nonvalidating/load-external-dtd", false);
saxfac.setFeature("http://xml.org/sax/features/external-general-entities", false);
saxfac.setFeature("http://xml.org/sax/features/external-parameter-entities", false);
val loadnode = input.map { case (k,v) => xml.XML.withSAXParser(saxfac.newSAXParser()).loadString(v)}
println(loadnode.count())
我遇到了一个奇怪的错误....(由于 SAX 解析器)
我做错了什么?
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
at org.apache.spark.rdd.RDD.map(RDD.scala:271)
at graphXtutorial.PubMedMainApp$.main(PubMedMainApp.scala:59)
at graphXtutorial.PubMedMainApp.main(PubMedMainApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: java.io.NotSerializableException: scala.xml.XML$$anon
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 10 more
Spark 任务必须是 java 可序列化的,以便它们可以发送到其他集群节点到 运行 上。尝试在 map
内构造解析器,这样您就不会尝试在每个集群节点上使用单个共享解析器实例(或者,更好的是,使用类似 mapPartitions
的东西,这样您就可以构造一个解析器实例对于每个分区 - 为每一行构造一个分区可能会有很多开销)。
我知道我迟到了很多年,但我在挣扎中遇到了这个 post 所以想分享我的解决方案
class XMLParser extends Serializable { @transient lazy val parseXml = (xmlString: String) => {
if(null != xmlString && xmlString.startsWith("<")) {
val parsedElem = scala.xml.XML.loadString(xmlString)
val fields = parsedElem \ "field"
fields.map(node =>
Field((node \ "name").text,(node \ "key").text,(node \ "description").text,
(node \ "fullPathKey").text,(node \ "value").text))
}else{
Nil
}}}
解决任务不可序列化问题的一般方法是将不可序列化的代码标记为@transient lazy val,然后将其封装在可序列化的class 中。这样 Spark 将不会序列化变量,但只会为每个执行程序加载一次
我有一堆带有 DTD 的 xml header,我正在尝试加载所有这些文件,忽略 DTD。
val input = sc.wholeTextFiles("""\path\*.nxml""")
val saxfac = SAXParserFactory.newInstance();
saxfac.setValidating(false);
saxfac.setFeature("http://xml.org/sax/features/validation", false);
saxfac.setFeature("http://apache.org/xml/features/nonvalidating/load-dtd-grammar", false);
saxfac.setFeature("http://apache.org/xml/features/nonvalidating/load-external-dtd", false);
saxfac.setFeature("http://xml.org/sax/features/external-general-entities", false);
saxfac.setFeature("http://xml.org/sax/features/external-parameter-entities", false);
val loadnode = input.map { case (k,v) => xml.XML.withSAXParser(saxfac.newSAXParser()).loadString(v)}
println(loadnode.count())
我遇到了一个奇怪的错误....(由于 SAX 解析器) 我做错了什么?
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
at org.apache.spark.rdd.RDD.map(RDD.scala:271)
at graphXtutorial.PubMedMainApp$.main(PubMedMainApp.scala:59)
at graphXtutorial.PubMedMainApp.main(PubMedMainApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: java.io.NotSerializableException: scala.xml.XML$$anon
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 10 more
Spark 任务必须是 java 可序列化的,以便它们可以发送到其他集群节点到 运行 上。尝试在 map
内构造解析器,这样您就不会尝试在每个集群节点上使用单个共享解析器实例(或者,更好的是,使用类似 mapPartitions
的东西,这样您就可以构造一个解析器实例对于每个分区 - 为每一行构造一个分区可能会有很多开销)。
我知道我迟到了很多年,但我在挣扎中遇到了这个 post 所以想分享我的解决方案
class XMLParser extends Serializable { @transient lazy val parseXml = (xmlString: String) => {
if(null != xmlString && xmlString.startsWith("<")) {
val parsedElem = scala.xml.XML.loadString(xmlString)
val fields = parsedElem \ "field"
fields.map(node =>
Field((node \ "name").text,(node \ "key").text,(node \ "description").text,
(node \ "fullPathKey").text,(node \ "value").text))
}else{
Nil
}}}
解决任务不可序列化问题的一般方法是将不可序列化的代码标记为@transient lazy val,然后将其封装在可序列化的class 中。这样 Spark 将不会序列化变量,但只会为每个执行程序加载一次