使用 Scala 将所有新行转换为 Spark 中的新列
Transforming all new rows into new column in Spark with Scala
我有一个数据框,其中固定列为 m1_amt 到 m4_amt,包含以下格式的数据:
+------+----------+----------+----------+-----------+
|Entity| m1_amt | m2_amt | m3_amt | m4_amt |
+------+----------+----------+----------+-----------+
| ISO | 1 | 2 | 3 | 4 |
| TEST | 5 | 6 | 7 | 8 |
| Beta | 9 | 10 | 11 | 12 |
+------+----------+----------+----------+-----------+
我正在尝试将每个新行转换为一个新列,如下所示:
+----------+-------+--------+------+
| Entity | ISO | TEST | Beta |
+----------+-------+--------+------+
| m1_amt | 1 | 5 | 9 |
| m2_amt | 2 | 6 | 10 |
| m3_amt | 3 | 7 | 11 |
| m4_amt | 4 | 8 | 12 |
+----------+-------+--------+------+
如何在 Spark 和 Scala 中实现这一点?
scala> df.show
+------+------+------+------+------+
|Entity|m1_amt|m2_amt|m3_amt|m4_amt|
+------+------+------+------+------+
| ISO| 1| 2| 3| 4|
| TEST| 5| 6| 7| 8|
| Beta| 9| 10| 11| 12|
+------+------+------+------+------+
scala> val df1 = df.withColumn("amt", to_json(struct(col("m1_amt"),col("m2_amt"),col("m3_amt"),col("m4_amt"))))
.withColumn("amt", regexp_replace(col("amt"), """[\{\"\}]""", ""))
.withColumn("amt", explode(split(col("amt"), ",")))
.withColumn("cols", split(col("amt"), ":")(0))
.withColumn("val", split(col("amt"), ":")(1))
.select("Entity","cols","val")
scala> df1.show
+------+------+---+
|Entity| cols|val|
+------+------+---+
| ISO|m1_amt| 1|
| ISO|m2_amt| 2|
| ISO|m3_amt| 3|
| ISO|m4_amt| 4|
| TEST|m1_amt| 5|
| TEST|m2_amt| 6|
| TEST|m3_amt| 7|
| TEST|m4_amt| 8|
| Beta|m1_amt| 9|
| Beta|m2_amt| 10|
| Beta|m3_amt| 11|
| Beta|m4_amt| 12|
+------+------+---+
scala> df1.groupBy(col("cols")).pivot("Entity").agg(concat_ws("",collect_set(col("val"))))
.withColumnRenamed("cols", "Entity")
.show()
+------+----+---+----+
|Entity|Beta|ISO|TEST|
+------+----+---+----+
|m3_amt| 11| 3| 7|
|m4_amt| 12| 4| 8|
|m2_amt| 10| 2| 6|
|m1_amt| 9| 1| 5|
+------+----+---+----+
我试过以下方法:
scala> val df=Seq(("ISO",1,2,3,4),
| ("TEST",5,6,7,8),
| ("Beta",9,10,11,12)).toDF("Entity","m1_amt","m2_amt","m3_amt","m4_amt")
df: org.apache.spark.sql.DataFrame = [Entity: string, m1_amt: int ... 3 more fields]
scala> df.show
+------+------+------+------+------+
|Entity|m1_amt|m2_amt|m3_amt|m4_amt|
+------+------+------+------+------+
| ISO| 1| 2| 3| 4|
| TEST| 5| 6| 7| 8|
| Beta| 9| 10| 11| 12|
+------+------+------+------+------+
scala> val selectDf= df.selectExpr("Entity","stack(4,'m1_amt',m1_amt,'m2_amt',m2_amt,'m3_amt',m3_amt,'m4_amt',m4_amt)")
selectDf: org.apache.spark.sql.DataFrame = [Entity: string, col0: string ... 1 more field]
scala> selectDf.show
+------+------+----+
|Entity| col0|col1|
+------+------+----+
| ISO|m1_amt| 1|
| ISO|m2_amt| 2|
| ISO|m3_amt| 3|
| ISO|m4_amt| 4|
| TEST|m1_amt| 5|
| TEST|m2_amt| 6|
| TEST|m3_amt| 7|
| TEST|m4_amt| 8|
| Beta|m1_amt| 9|
| Beta|m2_amt| 10|
| Beta|m3_amt| 11|
| Beta|m4_amt| 12|
+------+------+----+
scala> selectDf.groupBy("col0").pivot("Entity").agg(concat_ws("",collect_list(col("col1")))).withColumnRenamed("col0","Entity").show
+------+----+---+----+
|Entity|Beta|ISO|TEST|
+------+----+---+----+
|m3_amt| 11| 3| 7|
|m4_amt| 12| 4| 8|
|m2_amt| 10| 2| 6|
|m1_amt| 9| 1| 5|
+------+----+---+----+
我有一个数据框,其中固定列为 m1_amt 到 m4_amt,包含以下格式的数据:
+------+----------+----------+----------+-----------+
|Entity| m1_amt | m2_amt | m3_amt | m4_amt |
+------+----------+----------+----------+-----------+
| ISO | 1 | 2 | 3 | 4 |
| TEST | 5 | 6 | 7 | 8 |
| Beta | 9 | 10 | 11 | 12 |
+------+----------+----------+----------+-----------+
我正在尝试将每个新行转换为一个新列,如下所示:
+----------+-------+--------+------+
| Entity | ISO | TEST | Beta |
+----------+-------+--------+------+
| m1_amt | 1 | 5 | 9 |
| m2_amt | 2 | 6 | 10 |
| m3_amt | 3 | 7 | 11 |
| m4_amt | 4 | 8 | 12 |
+----------+-------+--------+------+
如何在 Spark 和 Scala 中实现这一点?
scala> df.show
+------+------+------+------+------+
|Entity|m1_amt|m2_amt|m3_amt|m4_amt|
+------+------+------+------+------+
| ISO| 1| 2| 3| 4|
| TEST| 5| 6| 7| 8|
| Beta| 9| 10| 11| 12|
+------+------+------+------+------+
scala> val df1 = df.withColumn("amt", to_json(struct(col("m1_amt"),col("m2_amt"),col("m3_amt"),col("m4_amt"))))
.withColumn("amt", regexp_replace(col("amt"), """[\{\"\}]""", ""))
.withColumn("amt", explode(split(col("amt"), ",")))
.withColumn("cols", split(col("amt"), ":")(0))
.withColumn("val", split(col("amt"), ":")(1))
.select("Entity","cols","val")
scala> df1.show
+------+------+---+
|Entity| cols|val|
+------+------+---+
| ISO|m1_amt| 1|
| ISO|m2_amt| 2|
| ISO|m3_amt| 3|
| ISO|m4_amt| 4|
| TEST|m1_amt| 5|
| TEST|m2_amt| 6|
| TEST|m3_amt| 7|
| TEST|m4_amt| 8|
| Beta|m1_amt| 9|
| Beta|m2_amt| 10|
| Beta|m3_amt| 11|
| Beta|m4_amt| 12|
+------+------+---+
scala> df1.groupBy(col("cols")).pivot("Entity").agg(concat_ws("",collect_set(col("val"))))
.withColumnRenamed("cols", "Entity")
.show()
+------+----+---+----+
|Entity|Beta|ISO|TEST|
+------+----+---+----+
|m3_amt| 11| 3| 7|
|m4_amt| 12| 4| 8|
|m2_amt| 10| 2| 6|
|m1_amt| 9| 1| 5|
+------+----+---+----+
我试过以下方法:
scala> val df=Seq(("ISO",1,2,3,4),
| ("TEST",5,6,7,8),
| ("Beta",9,10,11,12)).toDF("Entity","m1_amt","m2_amt","m3_amt","m4_amt")
df: org.apache.spark.sql.DataFrame = [Entity: string, m1_amt: int ... 3 more fields]
scala> df.show
+------+------+------+------+------+
|Entity|m1_amt|m2_amt|m3_amt|m4_amt|
+------+------+------+------+------+
| ISO| 1| 2| 3| 4|
| TEST| 5| 6| 7| 8|
| Beta| 9| 10| 11| 12|
+------+------+------+------+------+
scala> val selectDf= df.selectExpr("Entity","stack(4,'m1_amt',m1_amt,'m2_amt',m2_amt,'m3_amt',m3_amt,'m4_amt',m4_amt)")
selectDf: org.apache.spark.sql.DataFrame = [Entity: string, col0: string ... 1 more field]
scala> selectDf.show
+------+------+----+
|Entity| col0|col1|
+------+------+----+
| ISO|m1_amt| 1|
| ISO|m2_amt| 2|
| ISO|m3_amt| 3|
| ISO|m4_amt| 4|
| TEST|m1_amt| 5|
| TEST|m2_amt| 6|
| TEST|m3_amt| 7|
| TEST|m4_amt| 8|
| Beta|m1_amt| 9|
| Beta|m2_amt| 10|
| Beta|m3_amt| 11|
| Beta|m4_amt| 12|
+------+------+----+
scala> selectDf.groupBy("col0").pivot("Entity").agg(concat_ws("",collect_list(col("col1")))).withColumnRenamed("col0","Entity").show
+------+----+---+----+
|Entity|Beta|ISO|TEST|
+------+----+---+----+
|m3_amt| 11| 3| 7|
|m4_amt| 12| 4| 8|
|m2_amt| 10| 2| 6|
|m1_amt| 9| 1| 5|
+------+----+---+----+