如何在 Scala 中将 table 转换为时间序列 table

How to pivot a table into a timeseries table in Scala

我有以下 table:

index   0       1       2       id  
1       9.69    1.18    0.59    62
2       7.38    2.18    0.87    62
3       10.02   1.16    0.29    62

我正在尝试转向像 table 这样的时间序列。

预期输出:

data                    id
[9.69, 7.38, 10.02]     62
[1.18, 2.18, 1.16]      62
[0.59, 0.87, 0.29]      62

我试过下面的代码

val table = df.groupBy(df.col("id")).pivot("index").sum("0").cache()
val tablets = table.map(x => new transform(1.until(x.length).map(x.getDouble(_)).toList, x.getString(0)))
case class transform(data:List[Double], start:String)

但它只给出了这个输出

[9.69, 7.38, 10.02]     62

如何遍历所有列并获得所需的输出 table?

class pivot (df: DataFrame) {


val col1Names = df.drop("id").columns.tail
val kv = explode(array(df.select(col1Names.map(col): _*).columns.map {
c => struct(lit(c).alias("k"), col(c).alias("v"))
}: _*))

val tempdf = df.withColumn("kv", kv)
.select("index", "kv.k", "kv.v", "id")
.groupBy("id", "k")
.pivot("index")
.agg(first("v"))
.drop("k")

val col2Names = tempdf.columns.tail
val finaldf = tempdf.withColumn("data", array(col2Names.map(col): _*)).drop(col2Names: _*)

}

在您的解决方案中,您使用了 groupBysum,这将为每个组生成 aggregatedrow。这就是为什么你在结果中得到一个结果。

您的问题的解决方案有点复杂。我使用了 withColumn, explode, array, struct, pivot, groupBy, agg, drop, col, select and alias 的组合。以下是解决方案

   val df = Seq((1, 9.69,  1.18, 0.59, 62),
     (2, 7.38,  2.18, 0.87, 62),
     (3, 10.02, 1.16, 0.29, 62)).toDF("index", "0", "1", "2", "id")

如您的问题中所定义,您必须通过阅读上面的输入已经拥有如下数据框

+-----+-----+----+----+---+
|index|0    |1   |2   |id |
+-----+-----+----+----+---+
|1    |9.69 |1.18|0.59|62 |
|2    |7.38 |2.18|0.87|62 |
|3    |10.02|1.16|0.29|62 |
+-----+-----+----+----+---+

如果是,那么下面的解决方案应该有效。

val col1Names = df.drop("id").columns.tail
val kv = explode(array(df.select(col1Names.map(col): _*).columns.map {
  c => struct(lit(c).alias("k"), col(c).alias("v"))
}: _*))

val tempdf = df.withColumn("kv", kv)
                .select("index", "kv.k", "kv.v", "id")
                .groupBy("id", "k")
                .pivot("index")
                .agg(first("v"))
                .orderBy("k")
                .drop("k")

  val col2Names = tempdf.columns.tail
  val finaldf = tempdf.withColumn("data", array(col2Names.map(col): _*)).drop(col2Names: _*).sort($"data".desc)

您应该得到以下输出

+---+-------------------+
|id |data               |
+---+-------------------+
|62 |[9.69, 7.38, 10.02]|
|62 |[1.18, 2.18, 1.16] |
|62 |[0.59, 0.87, 0.29] |
+---+-------------------+