Spark:如何在 pyspark 或 scala spark 中分解数据并添加列名?
Spark : How do I exploded data and add column name also in pyspark or scala spark?
Spark:我想展开多列并合并为单列,并将列名称作为单独的行。
Input data:
+-----------+-----------+-----------+
| ASMT_ID | WORKER | LABOR |
+-----------+-----------+-----------+
| 1 | A1,A2,A3| B1,B2 |
+-----------+-----------+-----------+
| 2 | A1,A4 | B1 |
+-----------+-----------+-----------+
Expected Output:
+-----------+-----------+-----------+
| ASMT_ID |WRK_CODE |WRK_DETL |
+-----------+-----------+-----------+
| 1 | A1 | WORKER |
+-----------+-----------+-----------+
| 1 | A2 | WORKER |
+-----------+-----------+-----------+
| 1 | A3 | WORKER |
+-----------+-----------+-----------+
| 1 | B1 | LABOR |
+-----------+-----------+-----------+
| 1 | B2 | LABOR |
+-----------+-----------+-----------+
| 2 | A1 | WORKER |
+-----------+-----------+-----------+
| 2 | A4 | WORKER |
+-----------+-----------+-----------+
| 2 | B1 | LABOR |
+-----------+-----------+-----------+
PFA: Input image
可能不是最好的情况,但几个 explode
和 unionAll
就足够了。
import org.apache.spark.sql.functions._
df1.show
+-------+--------+-----+
|ASMT_ID| WORKER|LABOR|
+-------+--------+-----+
| 1|A1,A2,A3|B1,B2|
| 2| A1,A4| B1|
+-------+--------+-----+
df1.cache
val workers = df1.drop("LABOR")
.withColumn("WRK_CODE" , explode(split($"WORKER" , ",") ) )
.withColumn("WRK_DETL", lit("WORKER"))
.drop("WORKER")
val labors = df1.drop("WORKER")
.withColumn("WRK_CODE" , explode(split($"LABOR", ",") ) )
.withColumn("WRK_DETL", lit("LABOR") )
.drop("LABOR")
workers.unionAll(labors).orderBy($"ASMT_ID".asc , $"WRK_CODE".asc).show
+-------+--------+--------+
|ASMT_ID|WRK_CODE|WRK_DETL|
+-------+--------+--------+
| 1| A1| WORKER|
| 1| A2| WORKER|
| 1| A3| WORKER|
| 1| B1| LABOR|
| 1| B2| LABOR|
| 2| A1| WORKER|
| 2| A4| WORKER|
| 2| B1| LABOR|
+-------+--------+--------+
Spark:我想展开多列并合并为单列,并将列名称作为单独的行。
Input data:
+-----------+-----------+-----------+
| ASMT_ID | WORKER | LABOR |
+-----------+-----------+-----------+
| 1 | A1,A2,A3| B1,B2 |
+-----------+-----------+-----------+
| 2 | A1,A4 | B1 |
+-----------+-----------+-----------+
Expected Output:
+-----------+-----------+-----------+
| ASMT_ID |WRK_CODE |WRK_DETL |
+-----------+-----------+-----------+
| 1 | A1 | WORKER |
+-----------+-----------+-----------+
| 1 | A2 | WORKER |
+-----------+-----------+-----------+
| 1 | A3 | WORKER |
+-----------+-----------+-----------+
| 1 | B1 | LABOR |
+-----------+-----------+-----------+
| 1 | B2 | LABOR |
+-----------+-----------+-----------+
| 2 | A1 | WORKER |
+-----------+-----------+-----------+
| 2 | A4 | WORKER |
+-----------+-----------+-----------+
| 2 | B1 | LABOR |
+-----------+-----------+-----------+
PFA: Input image
可能不是最好的情况,但几个 explode
和 unionAll
就足够了。
import org.apache.spark.sql.functions._
df1.show
+-------+--------+-----+
|ASMT_ID| WORKER|LABOR|
+-------+--------+-----+
| 1|A1,A2,A3|B1,B2|
| 2| A1,A4| B1|
+-------+--------+-----+
df1.cache
val workers = df1.drop("LABOR")
.withColumn("WRK_CODE" , explode(split($"WORKER" , ",") ) )
.withColumn("WRK_DETL", lit("WORKER"))
.drop("WORKER")
val labors = df1.drop("WORKER")
.withColumn("WRK_CODE" , explode(split($"LABOR", ",") ) )
.withColumn("WRK_DETL", lit("LABOR") )
.drop("LABOR")
workers.unionAll(labors).orderBy($"ASMT_ID".asc , $"WRK_CODE".asc).show
+-------+--------+--------+
|ASMT_ID|WRK_CODE|WRK_DETL|
+-------+--------+--------+
| 1| A1| WORKER|
| 1| A2| WORKER|
| 1| A3| WORKER|
| 1| B1| LABOR|
| 1| B2| LABOR|
| 2| A1| WORKER|
| 2| A4| WORKER|
| 2| B1| LABOR|
+-------+--------+--------+