如何将逗号分隔的多列拆分为多行?

How to split Comma-separated multiple columns into multiple rows?

我有一个包含 N 个字段的数据框,如下所述。值的列数和长度会有所不同。

输入Table:

+--------------+-----------+-----------------------+
|Date          |Amount     |Status                 |
+--------------+-----------+-----------------------+
|2019,2018,2017|100,200,300|IN,PRE,POST            |
|2018          |73         |IN                     |
|2018,2017     |56,89      |IN,PRE                 |
+--------------+-----------+-----------------------+

我必须将其转换为以下格式,其中包含一个序列列。

预期输出Table:

+-------------+------+---------+
|Date  |Amount|Status| Sequence|
+------+------+------+---------+
|2019  |100   |IN    |   1     |
|2018  |200   |PRE   |   2     |
|2017  |300   |POST  |   3     |
|2018  |73    |IN    |   1     |
|2018  |56    |IN    |   1     |
|2017  |89    |PRE   |   2     |
+-------------+------+---------+

我试过使用 explode 但 explode 一次只取一个数组。

var df = dataRefined.withColumn("TOT_OVRDUE_TYPE", explode(split($"TOT_OVRDUE_TYPE", "\"))).toDF

var df1 = df.withColumn("TOT_OD_TYPE_AMT", explode(split($"TOT_OD_TYPE_AMT", "\"))).show 

有人知道我该怎么做吗?谢谢你的帮助。

假设每一行的每一列中的数据元素数相同:

首先,我重新创建了你的 DataFrame

import org.apache.spark.sql._
import scala.collection.mutable.ListBuffer

val df = Seq(("2019,2018,2017", "100,200,300", "IN,PRE,POST"), ("2018", "73", "IN"),
  ("2018,2017", "56,89", "IN,PRE")).toDF("Date", "Amount", "Status")

接下来,我拆分行并添加一个序列值,然后转换回 DF:

val exploded = df.rdd.flatMap(row => {
  val buffer = new ListBuffer[(String, String, String, Int)]
  val dateSplit = row(0).toString.split("\,", -1)
  val amountSplit = row(1).toString.split("\,", -1)
  val statusSplit = row(2).toString.split("\,", -1)
  val seqSize = dateSplit.size
  for(i <- 0 to seqSize-1)
    buffer += Tuple4(dateSplit(i), amountSplit(i), statusSplit(i), i+1)
  buffer.toList
}).toDF((df.columns:+"Sequence"): _*)

我敢肯定还有其他方法可以在不先将 DF 转换为 RDD 的情况下完成此操作,但这仍然会导致具有正确答案的 DF。

如果您有任何问题,请告诉我。

是的,我个人也觉得 explode 有点烦人,在你的情况下,我可能会改用 flatMap

import spark.implicits._
import org.apache.spark.sql.Row
val df = spark.sparkContext.parallelize(Seq((Seq(2019,2018,2017), Seq(100,200,300), Seq("IN","PRE","POST")),(Seq(2018), Seq(73), Seq("IN")),(Seq(2018,2017), Seq(56,89), Seq("IN","PRE")))).toDF()

val transformedDF = df
  .flatMap{case Row(dates: Seq[Int], amounts: Seq[Int], statuses: Seq[String]) =>
     dates.indices.map(index => (dates(index), amounts(index), statuses(index), index+1))}
  .toDF("Date", "Amount", "Status", "Sequence")

输出:

df.show
+----+------+------+--------+
|Date|Amount|Status|Sequence|
+----+------+------+--------+
|2019|   100|    IN|       1|
|2018|   200|   PRE|       2|
|2017|   300|  POST|       3|
|2018|    73|    IN|       1|
|2018|    56|    IN|       1|
|2017|    89|   PRE|       2|
+----+------+------+--------+

这是另一种对每一列使用 posexplode 并将所有生成的数据帧合并为一个的方法:

导入 org.apache.spark.sql.functions.{posexplode, monotonically_increasing_id, col}

val df = Seq(
  (Seq("2019", "2018", "2017"), Seq("100", "200", "300"), Seq("IN", "PRE", "POST")),
  (Seq("2018"), Seq("73"), Seq("IN")),
  (Seq("2018", "2017"), Seq("56", "89"), Seq("IN", "PRE")))
.toDF("Date","Amount", "Status")
.withColumn("idx", monotonically_increasing_id)

df.columns.filter(_ != "idx").map{
  c => df.select($"idx", posexplode(col(c))).withColumnRenamed("col", c)
}
.reduce((ds1, ds2) => ds1.join(ds2, Seq("idx", "pos")))
.select($"Date", $"Amount", $"Status", $"pos".plus(1).as("Sequence"))
.show

输出:

+----+------+------+--------+
|Date|Amount|Status|Sequence|
+----+------+------+--------+
|2019|   100|    IN|       1|
|2018|   200|   PRE|       2|
|2017|   300|  POST|       3|
|2018|    73|    IN|       1|
|2018|    56|    IN|       1|
|2017|    89|   PRE|       2|
+----+------+------+--------+

您可以使用 Dataframe 内置函数 arrays_zip,split,posexplode

来实现

Explanation:

scala>val df=Seq((("2019,2018,2017"),("100,200,300"),("IN,PRE,POST")),(("2018"),("73"),("IN")),(("2018,2017"),("56,89"),("IN,PRE"))).toDF("date","amount","status")

scala>:paste
df.selectExpr("""posexplode(
                            arrays_zip(
                                        split(date,","), //split date string with ',' to create array
                                        split(amount,","),
                                        split(status,","))) //zip arrays
                            as (p,colum) //pos explode on zip arrays will give position and column value
            """)
    .selectExpr("colum.`0` as Date", //get 0 column as date
                "colum.`1` as Amount", 
                "colum.`2` as Status", 
                "p+1 as Sequence") //add 1 to the position value
    .show()

Result:

+----+------+------+--------+
|Date|Amount|Status|Sequence|
+----+------+------+--------+
|2019|   100|    IN|       1|
|2018|   200|   PRE|       2|
|2017|   300|  POST|       3|
|2018|    73|    IN|       1|
|2018|    56|    IN|       1|
|2017|    89|   PRE|       2|
+----+------+------+--------+

我利用转置按位置压缩所有序列,然后进行了 posexplode。数据帧上的选择是动态的以满足条件:列数和值的长度将变化 在问题中。

import org.apache.spark.sql.functions._


val df = Seq(
  ("2019,2018,2017", "100,200,300", "IN,PRE,POST"),
  ("2018", "73", "IN"),
  ("2018,2017", "56,89", "IN,PRE")
).toDF("Date", "Amount", "Status")
df: org.apache.spark.sql.DataFrame = [Date: string, Amount: string ... 1 more field]

scala> df.show(false)
+--------------+-----------+-----------+
|Date          |Amount     |Status     |
+--------------+-----------+-----------+
|2019,2018,2017|100,200,300|IN,PRE,POST|
|2018          |73         |IN         |
|2018,2017     |56,89      |IN,PRE     |
+--------------+-----------+-----------+


scala> def transposeSeqOfSeq[S](x:Seq[Seq[S]]): Seq[Seq[S]] = { x.transpose }
transposeSeqOfSeq: [S](x: Seq[Seq[S]])Seq[Seq[S]]

scala> val myUdf = udf { transposeSeqOfSeq[String] _}
myUdf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,ArrayType(ArrayType(StringType,true),true),Some(List(ArrayType(ArrayType(StringType,true),true))))

scala> val df2 = df.select(df.columns.map(c => split(col(c), ",") as c): _*)
df2: org.apache.spark.sql.DataFrame = [Date: array<string>, Amount: array<string> ... 1 more field]

scala> df2.show(false)
+------------------+---------------+---------------+
|Date              |Amount         |Status         |
+------------------+---------------+---------------+
|[2019, 2018, 2017]|[100, 200, 300]|[IN, PRE, POST]|
|[2018]            |[73]           |[IN]           |
|[2018, 2017]      |[56, 89]       |[IN, PRE]      |
+------------------+---------------+---------------+


scala> val df3 = df2.withColumn("allcols", array(df.columns.map(c => col(c)): _*))
df3: org.apache.spark.sql.DataFrame = [Date: array<string>, Amount: array<string> ... 2 more fields]

scala> df3.show(false)
+------------------+---------------+---------------+------------------------------------------------------+
|Date              |Amount         |Status         |allcols                                               |
+------------------+---------------+---------------+------------------------------------------------------+
|[2019, 2018, 2017]|[100, 200, 300]|[IN, PRE, POST]|[[2019, 2018, 2017], [100, 200, 300], [IN, PRE, POST]]|
|[2018]            |[73]           |[IN]           |[[2018], [73], [IN]]                                  |
|[2018, 2017]      |[56, 89]       |[IN, PRE]      |[[2018, 2017], [56, 89], [IN, PRE]]                   |
+------------------+---------------+---------------+------------------------------------------------------+


scala> val df4 = df3.withColumn("ab", myUdf($"allcols")).select($"ab", posexplode($"ab"))
df4: org.apache.spark.sql.DataFrame = [ab: array<array<string>>, pos: int ... 1 more field]

scala> df4.show(false)
+------------------------------------------------------+---+-----------------+
|ab                                                    |pos|col              |
+------------------------------------------------------+---+-----------------+
|[[2019, 100, IN], [2018, 200, PRE], [2017, 300, POST]]|0  |[2019, 100, IN]  |
|[[2019, 100, IN], [2018, 200, PRE], [2017, 300, POST]]|1  |[2018, 200, PRE] |
|[[2019, 100, IN], [2018, 200, PRE], [2017, 300, POST]]|2  |[2017, 300, POST]|
|[[2018, 73, IN]]                                      |0  |[2018, 73, IN]   |
|[[2018, 56, IN], [2017, 89, PRE]]                     |0  |[2018, 56, IN]   |
|[[2018, 56, IN], [2017, 89, PRE]]                     |1  |[2017, 89, PRE]  |
+------------------------------------------------------+---+-----------------+

scala> val selCols = (0 until df.columns.length).map(i => $"col".getItem(i).as(df.columns(i))) :+ ($"pos"+1).as("Sequence")
selCols: scala.collection.immutable.IndexedSeq[org.apache.spark.sql.Column] = Vector(col[0] AS `Date`, col[1] AS `Amount`, col[2] AS `Status`, (pos + 1) AS `Sequence`)

scala> df4.select(selCols:_*).show(false)
+----+------+------+--------+
|Date|Amount|Status|Sequence|
+----+------+------+--------+
|2019|100   |IN    |1       |
|2018|200   |PRE   |2       |
|2017|300   |POST  |3       |
|2018|73    |IN    |1       |
|2018|56    |IN    |1       |
|2017|89    |PRE   |2       |
+----+------+------+--------+

这就是我喜欢 spark-core API 的原因。只需借助 map 和 flatMap 就可以解决很多问题。只需将您的 df 和 SQLContext 的实例传递给下面的方法,它就会给出所需的结果 -

def reShapeDf(df:DataFrame, sqlContext: SQLContext): DataFrame ={

    val rdd = df.rdd.map(m => (m.getAs[String](0),m.getAs[String](1),m.getAs[String](2)))

    val rdd1 = rdd.flatMap(a => a._1.split(",").zip(a._2.split(",")).zip(a._3.split(",")))
    val rdd2 = rdd1.map{
      case ((a,b),c) => (a,b,c)
    }

    sqlContext.createDataFrame(rdd2.map(m => Row.fromTuple(m)),df.schema)
}