旋转 DataFrame - Spark SQL

Pivoting DataFrame - Spark SQL

我有一个包含以下内容的 DataFrame:

TradeId|Source
ABC|"USD,333.123,20170605|USD,-789.444,20170605|GBP,1234.567,20150602"

我想对这些数据进行透视,使其变成下面的形式

TradeId|CCY|PV
ABC|USD|333.123
ABC|USD|-789.444
ABC|GBP|1234.567

CCY|PV|Date 三元组在 "Source" 列中的数量不固定。我可以在 ArrayList 中做到这一点,但这需要在 JVM 中加载数据并破坏了 Spark 的全部意义。

假设我的 DataFrame 如下所示:

DataFrame tradesSnap = this.loadTradesSnap(reportRequest);
String tempTable = getTempTableName();
tradesSnap.registerTempTable(tempTable);
tradesSnap = tradesSnap.sqlContext().sql("SELECT TradeId, Source FROM " + tempTable);

您想要实现的目标看起来更像是 flatMap

简单地说,通过在 Dataset 上使用 flatMap,您可以对每一行应用一个函数 (map),该函数本身会生成一系列行。然后将每组行连接成一个序列 (flat)。

下面的程序展示了这个想法:

import org.apache.spark.sql.SparkSession

case class Input(TradeId: String, Source: String)

case class Output(TradeId: String, CCY: String, PV: String, Date: String)

object FlatMapExample {

  // This function will produce more rows of output for each line of input
  def splitSource(in: Input): Seq[Output] =
    in.Source.split("\|", -1).map {
      source =>
        println(source)
        val Array(ccy, pv, date) = source.split(",", -1)
        Output(in.TradeId, ccy, pv, date)
    }

  def main(args: Array[String]): Unit = {

    // Initialization and loading
    val spark = SparkSession.builder().master("local").appName("pivoting-example").getOrCreate()
    import spark.implicits._
    val input = spark.read.options(Map("sep" -> "|", "header" -> "true")).csv(args(0)).as[Input]

    // For each line in the input, split the source and then 
    // concatenate each "sub-sequence" in a single `Dataset`
    input.flatMap(splitSource).show
  }

}

鉴于您的输入,这将是输出:

+-------+---+--------+--------+
|TradeId|CCY|      PV|    Date|
+-------+---+--------+--------+
|    ABC|USD| 333.123|20170605|
|    ABC|USD|-789.444|20170605|
|    ABC|GBP|1234.567|20150602|
+-------+---+--------+--------+

如果需要,您现在可以获取结果并将其保存为 CSV。

如果你阅读 databricks pivot,它会说 " A pivot is an aggregation where one (or more in the general case) of the grouping columns has its distinct values transposed into individual columns." 我猜这不是你想要的

我建议您使用 withColumn and functions 来获得您想要的最终输出。考虑到 dataframe 是您所拥有的

,您可以按照以下步骤操作
+-------+----------------------------------------------------------------+
|TradeId|Source                                                          |
+-------+----------------------------------------------------------------+
|ABC    |USD,333.123,20170605|USD,-789.444,20170605|GBP,1234.567,20150602|
+-------+----------------------------------------------------------------+

您可以使用 explodesplitwithColumn 执行以下操作以获得所需的输出

val explodedDF = dataframe.withColumn("Source", explode(split(col("Source"), "\|")))
val finalDF = explodedDF.withColumn("CCY", split($"Source", ",")(0))
  .withColumn("PV", split($"Source", ",")(1))
  .withColumn("Date",  split($"Source", ",")(2))
  .drop("Source")

finalDF.show(false)

最终输出为

+-------+---+--------+--------+
|TradeId|CCY|PV      |Date    |
+-------+---+--------+--------+
|ABC    |USD|333.123 |20170605|
|ABC    |USD|-789.444|20170605|
|ABC    |GBP|1234.567|20150602|
+-------+---+--------+--------+

希望这能解决您的问题