Spark/scala - 我们可以从数据框中的现有列值创建新列吗
Spark/scala - can we create new columns from an existing column value in a dataframe
我正在尝试查看是否可以使用 spark/scala 从数据帧中的其中一列中的值创建新列。
我有一个包含以下数据的数据框
df.show()
+---+-----------------------+
|id |allvals |
+---+-----------------------+
|1 |col1,val11|col3,val31 |
|3 |col3,val33|col1,val13 |
|2 |col2,val22 |
+---+-----------------------+
在上面的数据中,col1/col2/col3 是列名,后面是它的值。列名和值由 ,
分隔。每组由 |
.
分隔
现在,我想达到这样的效果
+---+----------------------+------+------+------+
|id |allvals |col1 |col2 |col3 |
+---+----------------------+------+------+------+
|1 |col1,val11|col3,val31 |val11 |null |val31 |
|3 |col3,val33|col1,val13 |val13 |null |val13 |
|2 |col2,val22 |null |val22 |null |
+---+----------------------+------+------+------+
感谢任何帮助。
您可以使用 udf
:
将列转换为 Map
import org.apache.spark.sql.functions._
import spark.implicits._
val df = Seq(
(1, "col1,val11|col3,val31"), (2, "col3,val33|col3,val13"), (2, "col2,val22")
).toDF("id", "allvals")
val to_map = udf((s: String) => s.split('|').collect { _.split(",") match {
case Array(k, v) => (k, v)
}}.toMap )
val dfWithMap = df.withColumn("allvalsmap", to_map($"allvals"))
val keys = dfWithMap.select($"allvalsmap").as[Map[String, String]].flatMap(_.keys.toSeq).distinct.collect
keys.foldLeft(dfWithMap)((df, k) => df.withColumn(k, $"allvalsmap".getItem(k))).drop("allvalsmap").show
// +---+--------------------+-----+-----+-----+
// | id| allvals| col3| col1| col2|
// +---+--------------------+-----+-----+-----+
// | 1|col1,val11|col3,v...|val31|val11| null|
// | 2|col3,val33|col3,v...|val13| null| null|
// | 2| col2,val22| null| null|val22|
// +---+--------------------+-----+-----+-----+
灵感来自 by user6910411。
您可以使用split
、explode
和groupBy/pivot/agg
转换DataFrame,如下:
val df = Seq(
(1, "col1,val11|col3,val31"),
(2, "col3,val33|col1,val13"),
(3, "col2,val22")
).toDF("id", "allvals")
import org.apache.spark.sql.functions._
df.withColumn("temp", split($"allvals", "\|")).
withColumn("temp", explode($"temp")).
withColumn("temp", split($"temp", ",")).
select($"id", $"allvals", $"temp".getItem(0).as("k"), $"temp".getItem(1).as("v")).
groupBy($"id", $"allvals").pivot("k").agg(first($"v"))
// +---+---------------------+-----+-----+-----+
// |id |allvals |col1 |col2 |col3 |
// +---+---------------------+-----+-----+-----+
// |1 |col1,val11|col3,val31|val11|null |val31|
// |3 |col2,val22 |null |val22|null |
// |2 |col3,val33|col1,val13|val13|null |val33|
// +---+---------------------+-----+-----+-----+
我正在尝试查看是否可以使用 spark/scala 从数据帧中的其中一列中的值创建新列。 我有一个包含以下数据的数据框
df.show()
+---+-----------------------+
|id |allvals |
+---+-----------------------+
|1 |col1,val11|col3,val31 |
|3 |col3,val33|col1,val13 |
|2 |col2,val22 |
+---+-----------------------+
在上面的数据中,col1/col2/col3 是列名,后面是它的值。列名和值由 ,
分隔。每组由 |
.
现在,我想达到这样的效果
+---+----------------------+------+------+------+
|id |allvals |col1 |col2 |col3 |
+---+----------------------+------+------+------+
|1 |col1,val11|col3,val31 |val11 |null |val31 |
|3 |col3,val33|col1,val13 |val13 |null |val13 |
|2 |col2,val22 |null |val22 |null |
+---+----------------------+------+------+------+
感谢任何帮助。
您可以使用 udf
:
Map
import org.apache.spark.sql.functions._
import spark.implicits._
val df = Seq(
(1, "col1,val11|col3,val31"), (2, "col3,val33|col3,val13"), (2, "col2,val22")
).toDF("id", "allvals")
val to_map = udf((s: String) => s.split('|').collect { _.split(",") match {
case Array(k, v) => (k, v)
}}.toMap )
val dfWithMap = df.withColumn("allvalsmap", to_map($"allvals"))
val keys = dfWithMap.select($"allvalsmap").as[Map[String, String]].flatMap(_.keys.toSeq).distinct.collect
keys.foldLeft(dfWithMap)((df, k) => df.withColumn(k, $"allvalsmap".getItem(k))).drop("allvalsmap").show
// +---+--------------------+-----+-----+-----+
// | id| allvals| col3| col1| col2|
// +---+--------------------+-----+-----+-----+
// | 1|col1,val11|col3,v...|val31|val11| null|
// | 2|col3,val33|col3,v...|val13| null| null|
// | 2| col2,val22| null| null|val22|
// +---+--------------------+-----+-----+-----+
灵感来自
您可以使用split
、explode
和groupBy/pivot/agg
转换DataFrame,如下:
val df = Seq(
(1, "col1,val11|col3,val31"),
(2, "col3,val33|col1,val13"),
(3, "col2,val22")
).toDF("id", "allvals")
import org.apache.spark.sql.functions._
df.withColumn("temp", split($"allvals", "\|")).
withColumn("temp", explode($"temp")).
withColumn("temp", split($"temp", ",")).
select($"id", $"allvals", $"temp".getItem(0).as("k"), $"temp".getItem(1).as("v")).
groupBy($"id", $"allvals").pivot("k").agg(first($"v"))
// +---+---------------------+-----+-----+-----+
// |id |allvals |col1 |col2 |col3 |
// +---+---------------------+-----+-----+-----+
// |1 |col1,val11|col3,val31|val11|null |val31|
// |3 |col2,val22 |null |val22|null |
// |2 |col3,val33|col1,val13|val13|null |val33|
// +---+---------------------+-----+-----+-----+