如何读取 Spark 中的嵌套集合
How to read a nested collection in Spark
我有一个镶木地板 table,其中一列是
, array<struct<col1,col2,..colN>>
可以 运行 使用 LATERAL VIEW 语法在 Hive 中针对这个 table 查询。
如何将此 table 读入 RDD,更重要的是如何在 Spark 中过滤、映射此嵌套集合?
在 Spark 文档中找不到对此的任何引用。提前感谢您提供任何信息!
ps。我觉得在 table 上提供一些统计数据可能会有所帮助。
主 table ~600 中的列数。行数 ~200m。
嵌套集合中 "columns" 的数量 ~10。嵌套集合中的平均记录数约为 35。
嵌套集合没有什么神奇之处。 Spark 将以相同的方式处理 RDD[(String, String)]
和 RDD[(String, Seq[String])]
。
不过,从 Parquet 文件中读取此类嵌套集合可能很棘手。
让我们以 spark-shell
(1.3.1) 为例:
scala> import sqlContext.implicits._
import sqlContext.implicits._
scala> case class Inner(a: String, b: String)
defined class Inner
scala> case class Outer(key: String, inners: Seq[Inner])
defined class Outer
写入拼花文件:
scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))
outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25
scala> outers.toDF.saveAsParquetFile("outers.parquet")
读取拼花文件:
scala> import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions.Row
scala> val dataFrame = sqlContext.parquetFile("outers.parquet")
dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>]
scala> val outers = dataFrame.map { row =>
| val key = row.getString(0)
| val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))
| Outer(key, inners)
| }
outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848
重要的部分是row.getAs[Seq[Row]](1)
。 struct
的嵌套序列的内部表示是 ArrayBuffer[Row]
,你可以使用它的任何超类型来代替 Seq[Row]
。 1
是外行中的列索引。我在这里使用了 getAs
方法,但在最新版本的 Spark 中有替代方法。参见Row trait.
的源代码
现在您已经有了 RDD[Outer]
,您可以应用任何想要的转换或操作。
// Filter the outers
outers.filter(_.inners.nonEmpty)
// Filter the inners
outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a")))
请注意,我们仅使用 spark-SQL 库来读取 parquet 文件。例如,在将其映射到 RDD 之前,您可以 select 直接在 DataFrame 上只包含想要的列。
dataFrame.select('col1, 'col2).map { row => ... }
我将给出一个基于 Python 的答案,因为这就是我正在使用的。我认为 Scala 也有类似的东西。
根据 Python API docs.
,Spark 1.4.0 中添加了 explode
函数来处理 DataFrame 中的嵌套数组
创建测试数据框:
from pyspark.sql import Row
df = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3]), Row(a=2, intlist=[4,5,6])])
df.show()
## +-+--------------------+
## |a| intlist|
## +-+--------------------+
## |1|ArrayBuffer(1, 2, 3)|
## |2|ArrayBuffer(4, 5, 6)|
## +-+--------------------+
使用explode
展平列表列:
from pyspark.sql.functions import explode
df.select(df.a, explode(df.intlist)).show()
## +-+---+
## |a|_c0|
## +-+---+
## |1| 1|
## |1| 2|
## |1| 3|
## |2| 4|
## |2| 5|
## |2| 6|
## +-+---+
另一种方法是使用这样的模式匹配:
val rdd: RDD[(String, List[(String, String)]] = dataFrame.map(_.toSeq.toList match {
case List(key: String, inners: Seq[Row]) => key -> inners.map(_.toSeq.toList match {
case List(a:String, b: String) => (a, b)
}).toList
})
您可以直接在 Row 上进行模式匹配,但由于某些原因可能会失败。
以上答案都是很好的答案,从不同的角度解决了这个问题; Spark SQL 也是访问嵌套数据的一种非常有用的方法。
下面是如何在 SQL 中直接使用 explode() 来查询嵌套集合的示例。
SELECT hholdid, tsp.person_seq_no
FROM ( SELECT hholdid, explode(tsp_ids) as tsp
FROM disc_mrt.unified_fact uf
)
tsp_ids 是结构的嵌套,它有很多属性,包括我在上面的外部查询中选择的 person_seq_no。
以上是在 Spark 2.0 中测试的。我做了一个小测试,它在 Spark 1.6 中不起作用。这个问题是在 Spark 2 不存在时提出的,因此这个答案很好地添加到处理嵌套结构的可用选项列表中。
另请查看以下 JIRAs for Hive-compatible way to query nested data using LATERAL VIEW OUTER
syntax,因为 Spark 2.2 还支持 OUTER
explode(例如,当嵌套集合为空时,但您仍然希望拥有来自父记录的属性):
针对 SQL 访问权限的 explode() 上的 JIRA 明显未解决:
我有一个镶木地板 table,其中一列是
, array<struct<col1,col2,..colN>>
可以 运行 使用 LATERAL VIEW 语法在 Hive 中针对这个 table 查询。
如何将此 table 读入 RDD,更重要的是如何在 Spark 中过滤、映射此嵌套集合?
在 Spark 文档中找不到对此的任何引用。提前感谢您提供任何信息!
ps。我觉得在 table 上提供一些统计数据可能会有所帮助。 主 table ~600 中的列数。行数 ~200m。 嵌套集合中 "columns" 的数量 ~10。嵌套集合中的平均记录数约为 35。
嵌套集合没有什么神奇之处。 Spark 将以相同的方式处理 RDD[(String, String)]
和 RDD[(String, Seq[String])]
。
不过,从 Parquet 文件中读取此类嵌套集合可能很棘手。
让我们以 spark-shell
(1.3.1) 为例:
scala> import sqlContext.implicits._
import sqlContext.implicits._
scala> case class Inner(a: String, b: String)
defined class Inner
scala> case class Outer(key: String, inners: Seq[Inner])
defined class Outer
写入拼花文件:
scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))
outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25
scala> outers.toDF.saveAsParquetFile("outers.parquet")
读取拼花文件:
scala> import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions.Row
scala> val dataFrame = sqlContext.parquetFile("outers.parquet")
dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>]
scala> val outers = dataFrame.map { row =>
| val key = row.getString(0)
| val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))
| Outer(key, inners)
| }
outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848
重要的部分是row.getAs[Seq[Row]](1)
。 struct
的嵌套序列的内部表示是 ArrayBuffer[Row]
,你可以使用它的任何超类型来代替 Seq[Row]
。 1
是外行中的列索引。我在这里使用了 getAs
方法,但在最新版本的 Spark 中有替代方法。参见Row trait.
现在您已经有了 RDD[Outer]
,您可以应用任何想要的转换或操作。
// Filter the outers
outers.filter(_.inners.nonEmpty)
// Filter the inners
outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a")))
请注意,我们仅使用 spark-SQL 库来读取 parquet 文件。例如,在将其映射到 RDD 之前,您可以 select 直接在 DataFrame 上只包含想要的列。
dataFrame.select('col1, 'col2).map { row => ... }
我将给出一个基于 Python 的答案,因为这就是我正在使用的。我认为 Scala 也有类似的东西。
根据 Python API docs.
,Spark 1.4.0 中添加了explode
函数来处理 DataFrame 中的嵌套数组
创建测试数据框:
from pyspark.sql import Row
df = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3]), Row(a=2, intlist=[4,5,6])])
df.show()
## +-+--------------------+
## |a| intlist|
## +-+--------------------+
## |1|ArrayBuffer(1, 2, 3)|
## |2|ArrayBuffer(4, 5, 6)|
## +-+--------------------+
使用explode
展平列表列:
from pyspark.sql.functions import explode
df.select(df.a, explode(df.intlist)).show()
## +-+---+
## |a|_c0|
## +-+---+
## |1| 1|
## |1| 2|
## |1| 3|
## |2| 4|
## |2| 5|
## |2| 6|
## +-+---+
另一种方法是使用这样的模式匹配:
val rdd: RDD[(String, List[(String, String)]] = dataFrame.map(_.toSeq.toList match {
case List(key: String, inners: Seq[Row]) => key -> inners.map(_.toSeq.toList match {
case List(a:String, b: String) => (a, b)
}).toList
})
您可以直接在 Row 上进行模式匹配,但由于某些原因可能会失败。
以上答案都是很好的答案,从不同的角度解决了这个问题; Spark SQL 也是访问嵌套数据的一种非常有用的方法。
下面是如何在 SQL 中直接使用 explode() 来查询嵌套集合的示例。
SELECT hholdid, tsp.person_seq_no
FROM ( SELECT hholdid, explode(tsp_ids) as tsp
FROM disc_mrt.unified_fact uf
)
tsp_ids 是结构的嵌套,它有很多属性,包括我在上面的外部查询中选择的 person_seq_no。
以上是在 Spark 2.0 中测试的。我做了一个小测试,它在 Spark 1.6 中不起作用。这个问题是在 Spark 2 不存在时提出的,因此这个答案很好地添加到处理嵌套结构的可用选项列表中。
另请查看以下 JIRAs for Hive-compatible way to query nested data using LATERAL VIEW OUTER
syntax,因为 Spark 2.2 还支持 OUTER
explode(例如,当嵌套集合为空时,但您仍然希望拥有来自父记录的属性):
针对 SQL 访问权限的 explode() 上的 JIRA 明显未解决: