XML Spark 和 groupby 操作的来源
XML source for Spark and groupby operation
我正在使用来自 databricks
的 XML
来源。这是我的 XML
示例数据。
<ds Name="abc">
<node begin="18" end="22" val="Organic" type="type1">
<hs id="0" begin="18" end="91" />
</node>
<node begin="22" end="23" val="Cereal">
<hs id="0" begin="18" end="91" />
</node>
<node begin="23" end="25" val="Kellogs" type="type2">
<hs id="0" begin="18" end="91" />
</node>
<node begin="22" end="23" val="Harry" type="type1">
<hs id="1" begin="108" end="520" />
</node>
<node begin="23" end="25" val="Potter" type="type1">
<hs id="1" begin="108" end="520" />
</node>
</ds>
我想合并所有 node.val
(按照它们在 XML
文件中出现的相同顺序] 按 hs id
).
比如上面数据的o/p应该是:
Name hs id Val
abc 0 Organic Cereal
abc 1 Harry Potter
这是我从数据块加载 XML 源代码的地方:
val df = sqlContext.read
.format("com.databricks.spark.xml")
.option("rowTag", "ds")
.option("attributePrefix", "")
.load(args(0))
df.registerTempTable("ds")
我不确定如何按 hs id
对数据集进行分组,并确保保留顺序。
val df_ds = sqlContext.sql("SELECT Name, node.type from ds")
尝试:
import scala.collection.mutable.LinkedHashMap
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.udf
val comb = udf((rows: Seq[Row]) => {
val result = LinkedHashMap[Long, Array[String]]()
for (row <- rows) {
val id = row.getAs[Row]("hs").getAs[Long]("id")
result(id) = result.getOrElse(id, Array[String]()) :+ row.getAs[String]("val")
}
result.values.toArray.map(x => x.mkString(" "))
})
df.printSchema
root
|-- Name: string (nullable = true)
|-- node: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- begin: long (nullable = true)
| | |-- end: long (nullable = true)
| | |-- hs: struct (nullable = true)
| | | |-- #VALUE: string (nullable = true)
| | | |-- begin: long (nullable = true)
| | | |-- end: long (nullable = true)
| | | |-- id: long (nullable = true)
| | |-- type: string (nullable = true)
| | |-- val: string (nullable = true)
df.withColumn("comb", comb(df("node")))
我正在使用来自 databricks
的 XML
来源。这是我的 XML
示例数据。
<ds Name="abc">
<node begin="18" end="22" val="Organic" type="type1">
<hs id="0" begin="18" end="91" />
</node>
<node begin="22" end="23" val="Cereal">
<hs id="0" begin="18" end="91" />
</node>
<node begin="23" end="25" val="Kellogs" type="type2">
<hs id="0" begin="18" end="91" />
</node>
<node begin="22" end="23" val="Harry" type="type1">
<hs id="1" begin="108" end="520" />
</node>
<node begin="23" end="25" val="Potter" type="type1">
<hs id="1" begin="108" end="520" />
</node>
</ds>
我想合并所有 node.val
(按照它们在 XML
文件中出现的相同顺序] 按 hs id
).
比如上面数据的o/p应该是:
Name hs id Val
abc 0 Organic Cereal
abc 1 Harry Potter
这是我从数据块加载 XML 源代码的地方:
val df = sqlContext.read
.format("com.databricks.spark.xml")
.option("rowTag", "ds")
.option("attributePrefix", "")
.load(args(0))
df.registerTempTable("ds")
我不确定如何按 hs id
对数据集进行分组,并确保保留顺序。
val df_ds = sqlContext.sql("SELECT Name, node.type from ds")
尝试:
import scala.collection.mutable.LinkedHashMap
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.udf
val comb = udf((rows: Seq[Row]) => {
val result = LinkedHashMap[Long, Array[String]]()
for (row <- rows) {
val id = row.getAs[Row]("hs").getAs[Long]("id")
result(id) = result.getOrElse(id, Array[String]()) :+ row.getAs[String]("val")
}
result.values.toArray.map(x => x.mkString(" "))
})
df.printSchema
root
|-- Name: string (nullable = true)
|-- node: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- begin: long (nullable = true)
| | |-- end: long (nullable = true)
| | |-- hs: struct (nullable = true)
| | | |-- #VALUE: string (nullable = true)
| | | |-- begin: long (nullable = true)
| | | |-- end: long (nullable = true)
| | | |-- id: long (nullable = true)
| | |-- type: string (nullable = true)
| | |-- val: string (nullable = true)
df.withColumn("comb", comb(df("node")))