如何使用类型化数据集将多值列拆分为单独的行?
How to split multi-value column into separate rows using typed Dataset?
我面临如何将多值列(即 List[String]
拆分为单独的行的问题。
初始数据集有以下类型:Dataset[(Integer, String, Double, scala.List[String])]
+---+--------------------+-------+--------------------+
| id| text | value | properties |
+---+--------------------+-------+--------------------+
| 0|Lorem ipsum dolor...| 1.0|[prp1, prp2, prp3..]|
| 1|Lorem ipsum dolor...| 2.0|[prp4, prp5, prp6..]|
| 2|Lorem ipsum dolor...| 3.0|[prp7, prp8, prp9..]|
生成的数据集应具有以下类型:
Dataset[(Integer, String, Double, String)]
和 properties
应该拆分为:
+---+--------------------+-------+--------------------+
| id| text | value | property |
+---+--------------------+-------+--------------------+
| 0|Lorem ipsum dolor...| 1.0| prp1 |
| 0|Lorem ipsum dolor...| 1.0| prp2 |
| 0|Lorem ipsum dolor...| 1.0| prp3 |
| 1|Lorem ipsum dolor...| 2.0| prp4 |
| 1|Lorem ipsum dolor...| 2.0| prp5 |
| 1|Lorem ipsum dolor...| 2.0| prp6 |
您可以使用 explode
:
df.withColumn("property", explode($"property"))
示例:
val df = Seq((1, List("a", "b"))).toDF("A", "B")
// df: org.apache.spark.sql.DataFrame = [A: int, B: array<string>]
df.withColumn("B", explode($"B")).show
+---+---+
| A| B|
+---+---+
| 1| a|
| 1| b|
+---+---+
explode
通常被建议使用,但它来自无类型的 DataFrame API 并且如果您使用数据集,我认为 flatMap
运算符可能更合适(请参阅 org.apache.spark.sql.Dataset).
flatMap[U](func: (T) ⇒ TraversableOnce[U])(implicit arg0: Encoder[U]): Dataset[U]
(Scala-specific) Returns a new Dataset by first applying a function to all elements of this Dataset, and then flattening the results.
您可以按如下方式使用它:
val ds = Seq(
(0, "Lorem ipsum dolor", 1.0, Array("prp1", "prp2", "prp3")))
.toDF("id", "text", "value", "properties")
.as[(Integer, String, Double, scala.List[String])]
scala> ds.flatMap { t =>
t._4.map { prp =>
(t._1, t._2, t._3, prp) }}.show
+---+-----------------+---+----+
| _1| _2| _3| _4|
+---+-----------------+---+----+
| 0|Lorem ipsum dolor|1.0|prp1|
| 0|Lorem ipsum dolor|1.0|prp2|
| 0|Lorem ipsum dolor|1.0|prp3|
+---+-----------------+---+----+
// or just using for-comprehension
for {
t <- ds
prp <- t._4
} yield (t._1, t._2, t._3, prp)
这是一种方法:
val myRDD = sc.parallelize(Array(
(0, "text0", 1.0, List("prp1", "prp2", "prp3")),
(1, "text1", 2.0, List("prp4", "prp5", "prp6")),
(2, "text2", 3.0, List("prp7", "prp8", "prp9"))
)).map{
case (i, t, v, ps) => ((i, t, v), ps)
}.flatMapValues(x => x).map{
case ((i, t, v), p) => (i, t, v, p)
}
我面临如何将多值列(即 List[String]
拆分为单独的行的问题。
初始数据集有以下类型:Dataset[(Integer, String, Double, scala.List[String])]
+---+--------------------+-------+--------------------+
| id| text | value | properties |
+---+--------------------+-------+--------------------+
| 0|Lorem ipsum dolor...| 1.0|[prp1, prp2, prp3..]|
| 1|Lorem ipsum dolor...| 2.0|[prp4, prp5, prp6..]|
| 2|Lorem ipsum dolor...| 3.0|[prp7, prp8, prp9..]|
生成的数据集应具有以下类型:
Dataset[(Integer, String, Double, String)]
和 properties
应该拆分为:
+---+--------------------+-------+--------------------+
| id| text | value | property |
+---+--------------------+-------+--------------------+
| 0|Lorem ipsum dolor...| 1.0| prp1 |
| 0|Lorem ipsum dolor...| 1.0| prp2 |
| 0|Lorem ipsum dolor...| 1.0| prp3 |
| 1|Lorem ipsum dolor...| 2.0| prp4 |
| 1|Lorem ipsum dolor...| 2.0| prp5 |
| 1|Lorem ipsum dolor...| 2.0| prp6 |
您可以使用 explode
:
df.withColumn("property", explode($"property"))
示例:
val df = Seq((1, List("a", "b"))).toDF("A", "B")
// df: org.apache.spark.sql.DataFrame = [A: int, B: array<string>]
df.withColumn("B", explode($"B")).show
+---+---+
| A| B|
+---+---+
| 1| a|
| 1| b|
+---+---+
explode
通常被建议使用,但它来自无类型的 DataFrame API 并且如果您使用数据集,我认为 flatMap
运算符可能更合适(请参阅 org.apache.spark.sql.Dataset).
flatMap[U](func: (T) ⇒ TraversableOnce[U])(implicit arg0: Encoder[U]): Dataset[U]
(Scala-specific) Returns a new Dataset by first applying a function to all elements of this Dataset, and then flattening the results.
您可以按如下方式使用它:
val ds = Seq(
(0, "Lorem ipsum dolor", 1.0, Array("prp1", "prp2", "prp3")))
.toDF("id", "text", "value", "properties")
.as[(Integer, String, Double, scala.List[String])]
scala> ds.flatMap { t =>
t._4.map { prp =>
(t._1, t._2, t._3, prp) }}.show
+---+-----------------+---+----+
| _1| _2| _3| _4|
+---+-----------------+---+----+
| 0|Lorem ipsum dolor|1.0|prp1|
| 0|Lorem ipsum dolor|1.0|prp2|
| 0|Lorem ipsum dolor|1.0|prp3|
+---+-----------------+---+----+
// or just using for-comprehension
for {
t <- ds
prp <- t._4
} yield (t._1, t._2, t._3, prp)
这是一种方法:
val myRDD = sc.parallelize(Array(
(0, "text0", 1.0, List("prp1", "prp2", "prp3")),
(1, "text1", 2.0, List("prp4", "prp5", "prp6")),
(2, "text2", 3.0, List("prp7", "prp8", "prp9"))
)).map{
case (i, t, v, ps) => ((i, t, v), ps)
}.flatMapValues(x => x).map{
case ((i, t, v), p) => (i, t, v, p)
}