在写入文件系统时,将 dataframe 转换为 XML 在 StaxXML 中抛出 Null Pointer Exception

Converting dataframe to XML in spark throws Null Pointer Exception in StaxXML while writing to file system

我正在使用基于给定 rowTag 的 sparkSession 读取 xml 文件。得到的结果dataFrame需要转换成xml文件。下面是我正在尝试的代码:

val sparkSession = SparkSession.builder.master("local[*]").getOrCreate()
val xmldf = sparkSession.read.format(SEAConstant.STR_IMPORT_SPARK_DATA_BRICK_XML)
      .option(SEAConstant.STR_ROW_TAG, "Employee").option("nullValue", "").load("demo.xml")
    val columnNames = xmldf.columns.toSeq
    val sdf = xmldf.select(columnNames.map(c => xmldf.col(c)): _*)
sdf.write.format("com.databricks.spark.xml").option("rootTag", "Company")
      .option("rowTag", "Employee").save("Rel")

这是 xml 文件:

    <?xml version="1.0"?>
  <Employee id="id47" masterRef="#id53" revision="" nomenclature="">
<ApplicationRef version="J.0" application="Teamcenter"></ApplicationRef>
<UserData id="id52">
<UserValue valueRef="#id4" value="" title="_CONFIG_CONTEXT"></UserValue></UserData></Employee>
<Employee id="id47" masterRef="#id53" revision="" nomenclature="">
<ApplicationRef version="B.0" application="Teamcenter"></ApplicationRef>
<UserData id="id63">
<UserValue valueRef="#id5" value="" title="_CONFIG_CONTEXT"></UserValue></UserData></Employee>

这里的问题是,如果我通过从 xmldf 中挑选任意 3 列来尝试只创建上面的 3 列来创建 sdf,它工作正常并创建 xml 文件。但是如果我给出所有的列,即使它们的数量是 2 或 3,它也会失败并出现以下错误:

19/06/25 14:45:14 ERROR Utils: Aborting task
    at com.databricks.spark.xml.parsers.StaxXmlGenerator$$anonfun$apply.apply(StaxXmlGenerator.scala:131)
    at com.databricks.spark.xml.parsers.StaxXmlGenerator$$anonfun$apply.apply(StaxXmlGenerator.scala:129)
    at scala.collection.immutable.List.foreach(List.scala:383)
    at com.databricks.spark.xml.parsers.StaxXmlGenerator$.apply(StaxXmlGenerator.scala:129)
    at com.databricks.spark.xml.util.XmlFile$$anonfun$$anon.next(XmlFile.scala:108)
    at com.databricks.spark.xml.util.XmlFile$$anonfun$$anon.next(XmlFile.scala:96)
    at scala.collection.Iterator$$anon.next(Iterator.scala:363)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun.apply(SparkHadoopWriter.scala:125)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun.apply(SparkHadoopWriter.scala:123)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1414)
    at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:135)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun.apply(SparkHadoopWriter.scala:79)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun.apply(SparkHadoopWriter.scala:78)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
19/06/25 14:45:14 ERROR SparkHadoopWriter: Task attempt_20190625144513_0012_m_000000_0 aborted.
19/06/25 14:45:14 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.SparkException: Task failed while writing rows
    at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:151)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun.apply(SparkHadoopWriter.scala:79)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun.apply(SparkHadoopWriter.scala:78)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

我看了很多地方,但找不到解决方案。此外,使用上面生成的相同 sdf,我能够成功创建 json 文件。 有什么想法吗?

xmldf.write.format("com.databricks.spark.xml").option("rootTag", "Company")
     .option("rowTag", "Employee").option("attributePrefix", "_Att")

将OP中的相应语句替换为this。 StaxParser 实际上是在寻找这些 attributePrefixvalueTag,没有它们它会抛出 NPE。我在看这个 github link



具有 spark 2.20 及以下 xml 依赖性



package com.examples

import java.io.File

import org.apache.commons.io.FileUtils
import org.apache.log4j.Level
import org.apache.spark.sql.{SQLContext, SparkSession}

  * Created by Ram Ghadiyaram
object SparkXmlTest {
  def main(args: Array[String]) {

    val spark = SparkSession.builder.
    val sc = spark.sparkContext
    val sqlContext = new SQLContext(sc)
    val str =
        |<?xml version="1.0"?>
        |  <Employee id="1">
        |      <Email>tp@xyz.com</Email>
        |      <UserData id="id32" type="AttributesInContext">
        |      <UserValue value="7in" title="Height"></UserValue>
        |      <UserValue value="23lb" title="Weight"></UserValue>
        |  </Employee>
        |  <Measures id="1">
        |      <Email>newdata@rty.com</Email>
        |      <UserData id="id32" type="SitesInContext">
        |  </Measures>
        |  <Employee id="2">
        |      <Email>tp@xyz.com</Email>
        |      <UserData id="id33" type="AttributesInContext">
        |      <UserValue value="7in" title="Height"></UserValue>
        |      <UserValue value="34lb" title="Weight"></UserValue>
        |  </Employee>
        |  <Measures id="2">
        |      <Email>nextrow@rty.com</Email>
        |      <UserData id="id35" type="SitesInContext">
        |  </Measures>
        |  <Employee id="3">
        |      <Email>tp@xyz.com</Email>
        |      <UserData id="id34" type="AttributesInContext">
        |      <UserValue value="7in" title="Height"></UserValue>
        |      <UserValue value="" title="Weight"></UserValue>
        |  </Employee>
    println("save to file ")

    val f = new File("xmltest.xml")
    FileUtils.writeStringToFile(f, str)

    val xmldf = spark.read.format("com.databricks.spark.xml")
      .option("rootTag", "Company")
      .option("rowTag", "Employee")
      .option("nullValue", "")
    val columnNames = xmldf.columns.toSeq
    val sdf = xmldf.select(columnNames.map(c => xmldf.col(c)): _*)
      .option("rootTag", "Company")
      .option("rowTag", "Employee")

    println("read back from saved file ....")
    val readbackdf = spark.read.format("com.databricks.spark.xml")
      .option("rootTag", "Company")
      .option("rowTag", "Employee")
      .option("nullValue", "")



save to file 
read back from saved file ....
|Email     |UserData                                                                      |_id|
|tp@xyz.com|[WrappedArray([null,Height,7in], [null,Weight,23lb]),id32,AttributesInContext]|1  |
|tp@xyz.com|[WrappedArray([null,Height,7in], [null,Weight,34lb]),id33,AttributesInContext]|2  |
|tp@xyz.com|[WrappedArray([null,Height,7in], [null,Weight,null]),id34,AttributesInContext]|3  |

更新:随着最新的 XML OP 更新,我尝试并得到异常并用下面的代码修复...

.option("attributePrefix", "_Att")
      .option("valueTag", "_VALUE")


This package allows reading XML files in local or distributed filesystem as Spark DataFrames. When reading files the API accepts several options:

path: Location of files. Similar to Spark can accept standard Hadoop globbing expressions.
rowTag: The row tag of your xml files to treat as a row. For example, in this xml   ..., the appropriate value would be book. Default is ROW. At the moment, rows containing self closing xml tags are not supported.
samplingRatio: Sampling ratio for inferring schema (0.0 ~ 1). Default is 1. Possible types are StructType, ArrayType, StringType, LongType, DoubleType, BooleanType, TimestampType and NullType, unless user provides a schema for this.
excludeAttribute : Whether you want to exclude attributes in elements or not. Default is false.
treatEmptyValuesAsNulls : (DEPRECATED: use nullValue set to "") Whether you want to treat whitespaces as a null value. Default is false
mode: The mode for dealing with corrupt records during parsing. Default is PERMISSIVE.
When it encounters a corrupted record, it sets all fields to null and puts the malformed string into a new field configured by columnNameOfCorruptRecord.
When it encounters a field of the wrong datatype, it sets the offending field to null.
DROPMALFORMED : ignores the whole corrupted records.
FAILFAST : throws an exception when it meets corrupted records.
inferSchema: if true, attempts to infer an appropriate type for each resulting DataFrame column, like a boolean, numeric or date type. If false, all resulting columns are of string type. Default is true.
columnNameOfCorruptRecord: The name of new field where malformed strings are stored. Default is _corrupt_record.
attributePrefix: The prefix for attributes so that we can differentiate attributes and elements. This will be the prefix for field names. Default is _.
valueTag: The tag used for the value when there are attributes in the element having no child. Default is _VALUE.
charset: Defaults to 'UTF-8' but can be set to other valid charset names
ignoreSurroundingSpaces: Defines whether or not surrounding whitespaces from values being read should be skipped. Default is false.
When writing files the API accepts several options:

path: Location to write files.
rowTag: The row tag of your xml files to treat as a row. For example, in this xml   ..., the appropriate value would be book. Default is ROW.
rootTag: The root tag of your xml files to treat as the root. For example, in this xml   ..., the appropriate value would be books. Default is ROWS.
nullValue: The value to write null value. Default is string null. When this is null, it does not write attributes and elements for fields.
attributePrefix: The prefix for attributes so that we can differentiating attributes and elements. This will be the prefix for field names. Default is _.
valueTag: The tag used for the value when there are attributes in the element having no child. Default is _VALUE.
compression: compression codec to use when saving to file. Should be the fully qualified name of a class implementing org.apache.hadoop.io.compress.CompressionCodec or one of case-insensitive shorten names (bzip2, gzip, lz4, and snappy). Defaults to no compression when a codec is not specified.
Currently it supports the shortened name usage. You can use just xml instead of com.databricks.spark.xml.


package com.examples

import java.io.File

import org.apache.commons.io.FileUtils
import org.apache.spark.sql.{SQLContext, SparkSession}

  * Created by Ram Ghadiyaram
object SparkXmlTest {
  // org.apache.log4j.Logger.getLogger("org").setLevel(Level.ERROR)
  def main(args: Array[String]) {

    val spark = SparkSession.builder.
    //  spark.sparkContext.setLogLevel("ERROR")
    val sc = spark.sparkContext
    val sqlContext = new SQLContext(sc)
    //    val str =
    //    """
    //        |<?xml version="1.0"?>
    //        |<Company>
    //        |  <Employee id="1">
    //        |      <Email>tp@xyz.com</Email>
    //        |      <UserData id="id32" type="AttributesInContext">
    //        |      <UserValue value="7in" title="Height"></UserValue>
    //        |      <UserValue value="23lb" title="Weight"></UserValue>
    //        |</UserData>
    //        |  </Employee>
    //        |  <Measures id="1">
    //        |      <Email>newdata@rty.com</Email>
    //        |      <UserData id="id32" type="SitesInContext">
    //        |</UserData>
    //        |  </Measures>
    //        |  <Employee id="2">
    //        |      <Email>tp@xyz.com</Email>
    //        |      <UserData id="id33" type="AttributesInContext">
    //        |      <UserValue value="7in" title="Height"></UserValue>
    //        |      <UserValue value="34lb" title="Weight"></UserValue>
    //        |</UserData>
    //        |  </Employee>
    //        |  <Measures id="2">
    //        |      <Email>nextrow@rty.com</Email>
    //        |      <UserData id="id35" type="SitesInContext">
    //        |</UserData>
    //        |  </Measures>
    //        |  <Employee id="3">
    //        |      <Email>tp@xyz.com</Email>
    //        |      <UserData id="id34" type="AttributesInContext">
    //        |      <UserValue value="7in" title="Height"></UserValue>
    //        |      <UserValue value="" title="Weight"></UserValue>
    //        |</UserData>
    //        |  </Employee>
    //        |</Company>
    //      """.stripMargin
    val str =
      |  <Employee id="id47" masterRef="#id53" revision="" nomenclature="">
      |<ApplicationRef version="J.0" application="Teamcenter"></ApplicationRef>
      |<UserData id="id52">
      |<UserValue valueRef="#id4" value="" title="_CONFIG_CONTEXT"></UserValue></UserData></Employee>
      |<Employee id="id47" masterRef="#id53" revision="" nomenclature="">
      |<ApplicationRef version="B.0" application="Teamcenter"></ApplicationRef>
      |<UserData id="id63">
      |<UserValue valueRef="#id5" value="" title="_CONFIG_CONTEXT"></UserValue></UserData></Employee>
    println("save to file ")

    val f = new File("xmltest.xml")
    FileUtils.writeStringToFile(f, str)

    val xmldf = spark.read.format("com.databricks.spark.xml")
      .option("rootTag", "Company")
      .option("rowTag", "Employee")
      .option("nullValue", "")
    val columnNames = xmldf.columns.toSeq
    val sdf = xmldf.select(columnNames.map(c => xmldf.col(c)): _*)
      .option("rootTag", "Company")
      .option("rowTag", "Employee")
      .option("attributePrefix", "_Att")
      .option("valueTag", "_VALUE")

    println("read back from saved file ....")
    val readbackdf = spark.read.format("com.databricks.spark.xml")
      .option("rootTag", "Company")
      .option("rowTag", "Employee")
      .option("nullValue", "")


save to file 
read back from saved file ....
|ApplicationRef   |UserData                       |_id |_masterRef|
|[Teamcenter, J.0]|[[_CONFIG_CONTEXT, #id4], id52]|id47|#id53     |
|[Teamcenter, B.0]|[[_CONFIG_CONTEXT, #id5], id63]|id47|#id53     |