如何使用 Spark-Xml 生成复数 XML

How to Generate a complex XML using Spark-Xml

我正在尝试从我的 JavaRDd< Book > 和 JavaRdd< Reviews > 生成一个复杂的 xml 我怎样才能将这两个组合在一起生成 xml ?

<xml>
<library>
    <books>
        <book>
            <author>test</author>
        </book>
    </books>
    <reviews>
        <review>
            <id>1</id>
        </review>
    </reviews>
</library>

如您所见,有一个包含子图书和评论的父根图书馆。

下面是我如何生成 Book and Review Dataframe

DataFrame bookFrame = sqlCon.createDataFrame(bookRDD, Book.class);
DataFrame reviewFrame = sqlCon.createDataFrame(reviewRDD, Review.class);

我知道要生成 xml,我特别怀疑是否有 Library rootTag 和 Books and Reviews 作为其子项。

我正在使用 Java。但如果你能指出我的正确之处,你可以编写 Scala 或 Python 示例。

这可能不是使用 Spark 执行此操作的最有效方法,但下面的代码可以按您的需要工作。 (尽管是 Scala,因为我的 Java 有点生疏了)

import java.io.{File, PrintWriter}
import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.io.Source

val spark = SparkSession.builder()
  .master("local[3]")
  .appName("test")
  .config("spark.driver.allowMultipleContexts", "true")
  .getOrCreate()

import spark.implicits._

/* Some code to test */
case class Book(author: String)
case class Review(id: Int)
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)


val bookFrame = List(
  Book("book1"),
  Book("book2"),
  Book("book3"),
  Book("book4"),
  Book("book5")
).toDS()


val reviewFrame = List(
  Review(1),
  Review(2),
  Review(3),
  Review(4)
).toDS()

/* End test code **/

// Using databricks api save as 1 big xml file (instead of many parts, using repartition)
// You don't have to use repartition, but each part-xxx file will wrap contents in the root tag, making it harder to concat later.
// And TBH it really doesn't matter that Spark is doing the merging here, since the combining of data is already on the master node only
bookFrame
  .repartition(1)
  .write
  .format("com.databricks.spark.xml")
  .option("rootTag", "books")
  .option("rowTag", "book")
  .mode(SaveMode.Overwrite)
  .save("/tmp/books/") // store to temp location

// Same for reviews
reviewFrame
  .repartition(1)
  .write
  .format("com.databricks.spark.xml")
  .option("rootTag", "reviews")
  .option("rowTag", "review")
  .mode(SaveMode.Overwrite)
  .save("/tmp/review") // store to temp location


def concatFiles(path:String):List[String] = {
  new File(path)
    .listFiles
    .filter(
      _.getName.startsWith("part") // get all part-xxx files only (should be only 1)
    )
    .flatMap(file => Source.fromFile(file.getAbsolutePath).getLines()) 
    .map("    " + _) // prefix with spaces to allow for new root level xml
    .toList
}

val lines = List("<xml>","<library>") ++ concatFiles("/tmp/books/") ++ concatFiles("/tmp/review/") ++ List("</library>")
new PrintWriter("/tmp/target.xml"){
  write(lines.mkString("\n"))
  close
}

结果:

<xml>
<library>
    <books>
        <book>
            <author>book1</author>
        </book>
        <book>
            <author>book2</author>
        </book>
        <book>
            <author>book3</author>
        </book>
        <book>
            <author>book4</author>
        </book>
        <book>
            <author>book5</author>
        </book>
    </books>
    <reviews>
        <review>
            <id>1</id>
        </review>
        <review>
            <id>2</id>
        </review>
        <review>
            <id>3</id>
        </review>
        <review>
            <id>4</id>
        </review>
    </reviews>
</library>

另一种方法可能是(仅使用 spark)创建一个新对象 case class BookReview(books: List[Book], reviews: List[Review]) 并在 .collect() 将所有书籍和评论放入一个列表后将其存储到 xml。

尽管那时我不会使用 spark 来处理单个记录 (BookReview),而是使用普通的 xml 库(比如 xstream 左右)来存储这个对象。


更新 List concat 方法对内存不友好,因此使用流和缓冲区这可能是替代 concatFiles 方法的解决方案。

def outputConcatFiles(path: String, outputFile: File): Unit = {
  new File(path)
    .listFiles
    .filter(
      _.getName.startsWith("part") // get all part-xxx files only (should be only 1)
    )
    .foreach(file => {
      val writer = new BufferedOutputStream(new FileOutputStream(outputFile, true))
      val reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)))

      try {
        Stream.continually(reader.readLine())
          .takeWhile(_ != null)
          .foreach(line =>
            writer.write(s"    $line\n".getBytes)
          )
      } catch {
        case e: Exception => println(e.getMessage)
      } finally {
        writer.close()
        reader.close()
      }
    })
}

val outputFile = new File("/tmp/target2.xml")
new PrintWriter(outputFile) { write("<xml>\n<library>\n"); close}
outputConcatFiles("/tmp/books/", outputFile)
outputConcatFiles("/tmp/review/", outputFile)
new PrintWriter(new FileOutputStream(outputFile, true)) { append("</library>"); close}