如何在 Scala 中将 table 转换为时间序列 table
How to pivot a table into a timeseries table in Scala
我有以下 table:
index 0 1 2 id
1 9.69 1.18 0.59 62
2 7.38 2.18 0.87 62
3 10.02 1.16 0.29 62
我正在尝试转向像 table 这样的时间序列。
预期输出:
data id
[9.69, 7.38, 10.02] 62
[1.18, 2.18, 1.16] 62
[0.59, 0.87, 0.29] 62
我试过下面的代码
val table = df.groupBy(df.col("id")).pivot("index").sum("0").cache()
val tablets = table.map(x => new transform(1.until(x.length).map(x.getDouble(_)).toList, x.getString(0)))
case class transform(data:List[Double], start:String)
但它只给出了这个输出
[9.69, 7.38, 10.02] 62
如何遍历所有列并获得所需的输出 table?
class pivot (df: DataFrame) {
val col1Names = df.drop("id").columns.tail
val kv = explode(array(df.select(col1Names.map(col): _*).columns.map {
c => struct(lit(c).alias("k"), col(c).alias("v"))
}: _*))
val tempdf = df.withColumn("kv", kv)
.select("index", "kv.k", "kv.v", "id")
.groupBy("id", "k")
.pivot("index")
.agg(first("v"))
.drop("k")
val col2Names = tempdf.columns.tail
val finaldf = tempdf.withColumn("data", array(col2Names.map(col): _*)).drop(col2Names: _*)
}
在您的解决方案中,您使用了 groupBy
和 sum
,这将为每个组生成 aggregated
个 row
。这就是为什么你在结果中得到一个结果。
您的问题的解决方案有点复杂。我使用了 withColumn, explode, array, struct, pivot, groupBy, agg, drop, col, select and alias
的组合。以下是解决方案
val df = Seq((1, 9.69, 1.18, 0.59, 62),
(2, 7.38, 2.18, 0.87, 62),
(3, 10.02, 1.16, 0.29, 62)).toDF("index", "0", "1", "2", "id")
如您的问题中所定义,您必须通过阅读上面的输入已经拥有如下数据框
+-----+-----+----+----+---+
|index|0 |1 |2 |id |
+-----+-----+----+----+---+
|1 |9.69 |1.18|0.59|62 |
|2 |7.38 |2.18|0.87|62 |
|3 |10.02|1.16|0.29|62 |
+-----+-----+----+----+---+
如果是,那么下面的解决方案应该有效。
val col1Names = df.drop("id").columns.tail
val kv = explode(array(df.select(col1Names.map(col): _*).columns.map {
c => struct(lit(c).alias("k"), col(c).alias("v"))
}: _*))
val tempdf = df.withColumn("kv", kv)
.select("index", "kv.k", "kv.v", "id")
.groupBy("id", "k")
.pivot("index")
.agg(first("v"))
.orderBy("k")
.drop("k")
val col2Names = tempdf.columns.tail
val finaldf = tempdf.withColumn("data", array(col2Names.map(col): _*)).drop(col2Names: _*).sort($"data".desc)
您应该得到以下输出
+---+-------------------+
|id |data |
+---+-------------------+
|62 |[9.69, 7.38, 10.02]|
|62 |[1.18, 2.18, 1.16] |
|62 |[0.59, 0.87, 0.29] |
+---+-------------------+
我有以下 table:
index 0 1 2 id
1 9.69 1.18 0.59 62
2 7.38 2.18 0.87 62
3 10.02 1.16 0.29 62
我正在尝试转向像 table 这样的时间序列。
预期输出:
data id
[9.69, 7.38, 10.02] 62
[1.18, 2.18, 1.16] 62
[0.59, 0.87, 0.29] 62
我试过下面的代码
val table = df.groupBy(df.col("id")).pivot("index").sum("0").cache()
val tablets = table.map(x => new transform(1.until(x.length).map(x.getDouble(_)).toList, x.getString(0)))
case class transform(data:List[Double], start:String)
但它只给出了这个输出
[9.69, 7.38, 10.02] 62
如何遍历所有列并获得所需的输出 table?
class pivot (df: DataFrame) {
val col1Names = df.drop("id").columns.tail
val kv = explode(array(df.select(col1Names.map(col): _*).columns.map {
c => struct(lit(c).alias("k"), col(c).alias("v"))
}: _*))
val tempdf = df.withColumn("kv", kv)
.select("index", "kv.k", "kv.v", "id")
.groupBy("id", "k")
.pivot("index")
.agg(first("v"))
.drop("k")
val col2Names = tempdf.columns.tail
val finaldf = tempdf.withColumn("data", array(col2Names.map(col): _*)).drop(col2Names: _*)
}
在您的解决方案中,您使用了 groupBy
和 sum
,这将为每个组生成 aggregated
个 row
。这就是为什么你在结果中得到一个结果。
您的问题的解决方案有点复杂。我使用了 withColumn, explode, array, struct, pivot, groupBy, agg, drop, col, select and alias
的组合。以下是解决方案
val df = Seq((1, 9.69, 1.18, 0.59, 62),
(2, 7.38, 2.18, 0.87, 62),
(3, 10.02, 1.16, 0.29, 62)).toDF("index", "0", "1", "2", "id")
如您的问题中所定义,您必须通过阅读上面的输入已经拥有如下数据框
+-----+-----+----+----+---+
|index|0 |1 |2 |id |
+-----+-----+----+----+---+
|1 |9.69 |1.18|0.59|62 |
|2 |7.38 |2.18|0.87|62 |
|3 |10.02|1.16|0.29|62 |
+-----+-----+----+----+---+
如果是,那么下面的解决方案应该有效。
val col1Names = df.drop("id").columns.tail
val kv = explode(array(df.select(col1Names.map(col): _*).columns.map {
c => struct(lit(c).alias("k"), col(c).alias("v"))
}: _*))
val tempdf = df.withColumn("kv", kv)
.select("index", "kv.k", "kv.v", "id")
.groupBy("id", "k")
.pivot("index")
.agg(first("v"))
.orderBy("k")
.drop("k")
val col2Names = tempdf.columns.tail
val finaldf = tempdf.withColumn("data", array(col2Names.map(col): _*)).drop(col2Names: _*).sort($"data".desc)
您应该得到以下输出
+---+-------------------+
|id |data |
+---+-------------------+
|62 |[9.69, 7.38, 10.02]|
|62 |[1.18, 2.18, 1.16] |
|62 |[0.59, 0.87, 0.29] |
+---+-------------------+