XML Spark 和 groupby 操作的来源

XML source for Spark and groupby operation

我正在使用来自 databricksXML 来源。这是我的 XML 示例数据。

<ds Name="abc">
   <node begin="18" end="22" val="Organic" type="type1">
      <hs id="0" begin="18" end="91" />
   </node>
   <node begin="22" end="23" val="Cereal">
      <hs id="0" begin="18" end="91" />
   </node>
   <node begin="23" end="25" val="Kellogs" type="type2">
      <hs id="0" begin="18" end="91" />
   </node>
   <node begin="22" end="23" val="Harry" type="type1">
      <hs id="1" begin="108" end="520" />
   </node>
   <node begin="23" end="25" val="Potter" type="type1">
      <hs id="1" begin="108" end="520" />
   </node>
</ds>

我想合并所有 node.val按照它们在 XML 文件中出现的相同顺序] 按 hs id).

比如上面数据的o/p应该是:

Name hs id Val

abc 0 Organic Cereal

abc 1 Harry Potter

这是我从数据块加载 XML 源代码的地方:

val df = sqlContext.read
.format("com.databricks.spark.xml")
.option("rowTag", "ds")
.option("attributePrefix", "")
.load(args(0))

df.registerTempTable("ds")

我不确定如何按 hs id 对数据集进行分组,并确保保留顺序。

val  df_ds = sqlContext.sql("SELECT Name, node.type from ds")

尝试:

import scala.collection.mutable.LinkedHashMap
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.udf

val comb = udf((rows: Seq[Row]) => {
  val result = LinkedHashMap[Long, Array[String]]()
  for (row <- rows) {
     val id = row.getAs[Row]("hs").getAs[Long]("id")
     result(id) = result.getOrElse(id, Array[String]()) :+ row.getAs[String]("val")
  }
  result.values.toArray.map(x => x.mkString(" "))
})

df.printSchema
root
 |-- Name: string (nullable = true)
 |-- node: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- begin: long (nullable = true)
 |    |    |-- end: long (nullable = true)
 |    |    |-- hs: struct (nullable = true)
 |    |    |    |-- #VALUE: string (nullable = true)
 |    |    |    |-- begin: long (nullable = true)
 |    |    |    |-- end: long (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- val: string (nullable = true)

df.withColumn("comb", comb(df("node")))