Spark SQL 列操作
Spark SQL Column Manipulation
我有一个数据集低于列数。
df.show();
+--------+---------+---------+---------+---------+
| Col1 | Col2 | Expend1 | Expend2 | Expend3 |
+--------+---------+---------+---------+---------+
| Value1 | Cvalue1 | 123 | 2254 | 22 |
| Value1 | Cvalue2 | 124 | 2255 | 23 |
+--------+---------+---------+---------+---------+
我希望使用一些连接或多维数据集或任何操作将其更改为以下格式。
1.
+--------+---------+------+
| Value1 | Cvalue1 | 123 |
| Value1 | Cvalue1 | 2254 |
| Value1 | Cvalue1 | 22 |
| Value1 | Cvalue1 | 124 |
| Value1 | Cvalue1 | 2255 |
| Value1 | Cvalue1 | 23 |
+--------+---------+------+
或这种格式更好
2.
+--------+---------+---------+------+
| Value1 | Cvalue1 | Expend1 | 123 |
| Value1 | Cvalue1 | Expend2 | 2254 |
| Value1 | Cvalue1 | Expend3 | 22 |
| Value1 | Cvalue1 | Expend1 | 124 |
| Value1 | Cvalue1 | Expend2 | 2255 |
| Value1 | Cvalue1 | Expend3 | 23 |
+--------+---------+---------+------+
请问能不能实现以上两种可能的格式。如果在 #1 的情况下,我可以获取最后一个值的列名,无论是 Expend1 还是 Expend 2 或 Expend3.
您可以将三列转换为 array
并且 explode
它
import org.apache.spark.sql.functions._
df.withColumn("Expand", explode(array("Expand1", "Expand2", "Expand3")))
.drop("Expand1", "Expand2", "Expand3")
要保留列值,您可以执行以下操作
data.withColumn("Expand1", concat_ws(":", lit("Expand1"), $"Expand1"))
.withColumn("Expand2", concat_ws(":", lit("Expand2") , $"Expand2"))
.withColumn("Expand3", concat_ws(":", lit("Expand3") , $"Expand3"))
.withColumn("Expand", explode(array("Expand1", "Expand2", "Expand3")))
.drop("Expand1", "Expand2", "Expand3")
.withColumn("ExpandColumn", split($"Expand", ":")(0))
.withColumn("Expand", split($"Expand", ":")(1))
.drop("Expand1", "Expand2", "Expand3")
.show(false)
希望对您有所帮助
您可以使用 Hive 函数执行此操作 stack
:
df.selectExpr("col1",
"col2",
"stack(3 , 'Expend1' , Expend1,
'Expend2' , Expend2,
'Expend3' , Expend3)
as (Expend, Value) "
).show(false)
+------+-------+-------+-----+
|col1 |col2 |Expend |Value|
+------+-------+-------+-----+
|Value1|Cvalue1|Expend1|123 |
|Value1|Cvalue1|Expend2|2254 |
|Value1|Cvalue1|Expend3|22 |
|Value1|Cvalue2|Expend1|124 |
|Value1|Cvalue2|Expend2|2255 |
|Value1|Cvalue2|Expend3|23 |
+------+-------+-------+-----+
使用 udf
函数,您可以获得 第二个所需的数据帧 。
val columns = df.select("Expend1","Expend2","Expend3").columns
import org.apache.spark.sql.functions._
def arrayStructUdf = udf((columnNames: collection.mutable.WrappedArray[String], columnValues: collection.mutable.WrappedArray[String]) => columnNames.zip(columnValues).map(x => (x._1, x._2)).toArray)
然后只需调用 udf
函数,drop
三个额外的列,然后 explode
新形成的列,最后 select
所需的列
df.withColumn("new", arrayStructUdf(array(columns.map(x => lit(x)): _*), array(columns.map(col): _*)))
.drop("Expend1","Expend2","Expend3")
.withColumn("new", explode(col("new")))
.select("Col1","Col2", "new.*")
您应该有 第二个必需的数据帧
+------+-------+-------+----+
|Col1 |Col2 |_1 |_2 |
+------+-------+-------+----+
|Value1|Cvalue1|Expend1|123 |
|Value1|Cvalue1|Expend2|2254|
|Value1|Cvalue1|Expend3|22 |
|Value1|Cvalue2|Expend1|124 |
|Value1|Cvalue2|Expend2|2255|
|Value1|Cvalue2|Expend3|23 |
+------+-------+-------+----+
函数map
然后explode
可以使用:
val data = List(
("Value1", "Cvalue1", 123, 2254, 22),
("Value1", "Cvalue2", 124, 2255, 23)
)
val df = data.toDF("Col1", "Col2", "Expend1", "Expend2", "Expend3")
// action
val unpivotedColumns = List("Expend1", "Expend2", "Expend3")
val columnMapping = unpivotedColumns.foldLeft(new ArrayBuffer[Column]())((acc, current) => {
acc += lit(current)
acc += col(current)
})
val mapped = df.select($"Col1", $"Col2", map(columnMapping: _*).alias("result"))
val result = mapped.select($"Col1", $"Col2", explode($"result"))
result.show(false)
结果是:
+------+-------+-------+-----+
|Col1 |Col2 |key |value|
+------+-------+-------+-----+
|Value1|Cvalue1|Expend1|123 |
|Value1|Cvalue1|Expend2|2254 |
|Value1|Cvalue1|Expend3|22 |
|Value1|Cvalue2|Expend1|124 |
|Value1|Cvalue2|Expend2|2255 |
|Value1|Cvalue2|Expend3|23 |
+------+-------+-------+-----+
我有一个数据集低于列数。
df.show();
+--------+---------+---------+---------+---------+
| Col1 | Col2 | Expend1 | Expend2 | Expend3 |
+--------+---------+---------+---------+---------+
| Value1 | Cvalue1 | 123 | 2254 | 22 |
| Value1 | Cvalue2 | 124 | 2255 | 23 |
+--------+---------+---------+---------+---------+
我希望使用一些连接或多维数据集或任何操作将其更改为以下格式。
1.
+--------+---------+------+
| Value1 | Cvalue1 | 123 |
| Value1 | Cvalue1 | 2254 |
| Value1 | Cvalue1 | 22 |
| Value1 | Cvalue1 | 124 |
| Value1 | Cvalue1 | 2255 |
| Value1 | Cvalue1 | 23 |
+--------+---------+------+
或这种格式更好
2.
+--------+---------+---------+------+
| Value1 | Cvalue1 | Expend1 | 123 |
| Value1 | Cvalue1 | Expend2 | 2254 |
| Value1 | Cvalue1 | Expend3 | 22 |
| Value1 | Cvalue1 | Expend1 | 124 |
| Value1 | Cvalue1 | Expend2 | 2255 |
| Value1 | Cvalue1 | Expend3 | 23 |
+--------+---------+---------+------+
请问能不能实现以上两种可能的格式。如果在 #1 的情况下,我可以获取最后一个值的列名,无论是 Expend1 还是 Expend 2 或 Expend3.
您可以将三列转换为 array
并且 explode
它
import org.apache.spark.sql.functions._
df.withColumn("Expand", explode(array("Expand1", "Expand2", "Expand3")))
.drop("Expand1", "Expand2", "Expand3")
要保留列值,您可以执行以下操作
data.withColumn("Expand1", concat_ws(":", lit("Expand1"), $"Expand1"))
.withColumn("Expand2", concat_ws(":", lit("Expand2") , $"Expand2"))
.withColumn("Expand3", concat_ws(":", lit("Expand3") , $"Expand3"))
.withColumn("Expand", explode(array("Expand1", "Expand2", "Expand3")))
.drop("Expand1", "Expand2", "Expand3")
.withColumn("ExpandColumn", split($"Expand", ":")(0))
.withColumn("Expand", split($"Expand", ":")(1))
.drop("Expand1", "Expand2", "Expand3")
.show(false)
希望对您有所帮助
您可以使用 Hive 函数执行此操作 stack
:
df.selectExpr("col1",
"col2",
"stack(3 , 'Expend1' , Expend1,
'Expend2' , Expend2,
'Expend3' , Expend3)
as (Expend, Value) "
).show(false)
+------+-------+-------+-----+
|col1 |col2 |Expend |Value|
+------+-------+-------+-----+
|Value1|Cvalue1|Expend1|123 |
|Value1|Cvalue1|Expend2|2254 |
|Value1|Cvalue1|Expend3|22 |
|Value1|Cvalue2|Expend1|124 |
|Value1|Cvalue2|Expend2|2255 |
|Value1|Cvalue2|Expend3|23 |
+------+-------+-------+-----+
使用 udf
函数,您可以获得 第二个所需的数据帧 。
val columns = df.select("Expend1","Expend2","Expend3").columns
import org.apache.spark.sql.functions._
def arrayStructUdf = udf((columnNames: collection.mutable.WrappedArray[String], columnValues: collection.mutable.WrappedArray[String]) => columnNames.zip(columnValues).map(x => (x._1, x._2)).toArray)
然后只需调用 udf
函数,drop
三个额外的列,然后 explode
新形成的列,最后 select
所需的列
df.withColumn("new", arrayStructUdf(array(columns.map(x => lit(x)): _*), array(columns.map(col): _*)))
.drop("Expend1","Expend2","Expend3")
.withColumn("new", explode(col("new")))
.select("Col1","Col2", "new.*")
您应该有 第二个必需的数据帧
+------+-------+-------+----+
|Col1 |Col2 |_1 |_2 |
+------+-------+-------+----+
|Value1|Cvalue1|Expend1|123 |
|Value1|Cvalue1|Expend2|2254|
|Value1|Cvalue1|Expend3|22 |
|Value1|Cvalue2|Expend1|124 |
|Value1|Cvalue2|Expend2|2255|
|Value1|Cvalue2|Expend3|23 |
+------+-------+-------+----+
函数map
然后explode
可以使用:
val data = List(
("Value1", "Cvalue1", 123, 2254, 22),
("Value1", "Cvalue2", 124, 2255, 23)
)
val df = data.toDF("Col1", "Col2", "Expend1", "Expend2", "Expend3")
// action
val unpivotedColumns = List("Expend1", "Expend2", "Expend3")
val columnMapping = unpivotedColumns.foldLeft(new ArrayBuffer[Column]())((acc, current) => {
acc += lit(current)
acc += col(current)
})
val mapped = df.select($"Col1", $"Col2", map(columnMapping: _*).alias("result"))
val result = mapped.select($"Col1", $"Col2", explode($"result"))
result.show(false)
结果是:
+------+-------+-------+-----+
|Col1 |Col2 |key |value|
+------+-------+-------+-----+
|Value1|Cvalue1|Expend1|123 |
|Value1|Cvalue1|Expend2|2254 |
|Value1|Cvalue1|Expend3|22 |
|Value1|Cvalue2|Expend1|124 |
|Value1|Cvalue2|Expend2|2255 |
|Value1|Cvalue2|Expend3|23 |
+------+-------+-------+-----+