为 Spark Table 使用 Scala case class 生成动态 header
Generate dynamic header using Scala case class for Spark Table
I have an existing case class having many fields
case class output {
userId : String,
timeStamp: String,
...
}
And I am using it to generate header for a spark job like this.
--------------------
userId | timeStamp|
--------------------
1 2324444444
2 2334445556
Now i want to add more columns to this and these column will be come from
map(attributeName, attributeValue) as attributeNames. So my question
is how can I add map to case class and then how can i use map key as
column value to generate dynamic columns. After this my final output
should be like
----------------------------------------------------
userId | timeStamp| attributeName1 | attributeName2
----------------------------------------------------
1 2324444444| |
2 2334445554| |
你可以这样做
case class output {
userId : String,
timeStamp: String,
keyvalues: Map,
...
}
import spark.implicits._
import org.apache.spark.sql.functions._
val df = spark.read.textFile(inputlocation).as[output]
val keysDF = df.select(explode(map_keys($"keyvalues"))).distinct()
val keys = keysDF.collect().map(f=>f.get(0)).map(f=>col("keyvalues").getItem(f).as(f.toString))
df.select(col("userId") +: keyCols:_*)
或者您可以查看此 以了解其他方法。
I have an existing case class having many fields case class output { userId : String, timeStamp: String, ... } And I am using it to generate header for a spark job like this. -------------------- userId | timeStamp| -------------------- 1 2324444444 2 2334445556 Now i want to add more columns to this and these column will be come from
map(attributeName, attributeValue) as attributeNames. So my question
is how can I add map to case class and then how can i use map key as
column value to generate dynamic columns. After this my final output
should be like ---------------------------------------------------- userId | timeStamp| attributeName1 | attributeName2 ---------------------------------------------------- 1 2324444444| | 2 2334445554| |
你可以这样做
case class output {
userId : String,
timeStamp: String,
keyvalues: Map,
...
}
import spark.implicits._
import org.apache.spark.sql.functions._
val df = spark.read.textFile(inputlocation).as[output]
val keysDF = df.select(explode(map_keys($"keyvalues"))).distinct()
val keys = keysDF.collect().map(f=>f.get(0)).map(f=>col("keyvalues").getItem(f).as(f.toString))
df.select(col("userId") +: keyCols:_*)
或者您可以查看此