如何将此数据集转换为以下数据集
How to transform this dataset to the following dataset
输入
+------+------+------+------+
|emp_name|emp_area| dept|zip|
+------+------+------+------+
|ram|USA|"Sales"|805912|
|sham|USA|"Sales"|805912|
|ram|Canada|"Marketing"|805912|
|ram|USA|"Sales"|805912|
|sham|USA|"Marketing"|805912|
+------+------+------+------
期望的输出
feature |Top1 name |Top 1 value1|Top2 name|top 2 value|
emp_name ram |3|sham |2
emp_area Usa |4|canada |1
dept sales|3|Marketing|3
zip 805912|5|NA|NA
我开始为它们中的每一个动态生成计数,但无法将它们存储在数据集中
val features=ds.columns.toList
for (e <- features) {
val ds1=ds.groupBy(e).count().sort(desc("count")).limit(5).withColumnRenamed("count", e+"_count")
}
现在如何将所有值收集到一个数据帧中并转换为输出?
这里有一个稍微冗长的方法。您可以将每一列 map
到具有一行的数据框,该行对应于所需输出中的行。如有必要,添加 NA 列。将列名转换为所需的列名,最后执行 unionAll
合并数据帧(每行一行)。
import org.apache.spark.sql.expressions.Window
val top = 2
val result = ds.columns.map(
c => ds.groupBy(c).count()
.withColumn("rn", row_number().over(Window.orderBy(desc("count"))))
.filter(s"rn <= $top")
.groupBy().pivot("rn")
.agg(first(col(c)), first(col("count")))
.select(lit(c), col("*"))
).map(df =>
if (df.columns.size != 1 + top*2)
df.select(List(col("*")) ::: (1 to (top*2+1 - df.columns.size)).toList.map(x => lit("NA")): _*)
else df
).map(df =>
df.toDF(List("feature") ::: (1 to top).toList.flatMap(x => Seq(s"top$x name", s"top$x value")): _*)
).reduce(_ unionAll _)
result.show
+--------+---------+----------+---------+----------+
| feature|top1 name|top1 value|top2 name|top2 value|
+--------+---------+----------+---------+----------+
|emp_name| ram| 3| sham| 2|
|emp_area| USA| 4| Canada| 1|
| dept| Sales| 3|Marketing| 2|
| zip| 805912| 5| NA| NA|
+--------+---------+----------+---------+----------+
输入
+------+------+------+------+
|emp_name|emp_area| dept|zip|
+------+------+------+------+
|ram|USA|"Sales"|805912|
|sham|USA|"Sales"|805912|
|ram|Canada|"Marketing"|805912|
|ram|USA|"Sales"|805912|
|sham|USA|"Marketing"|805912|
+------+------+------+------
期望的输出
feature |Top1 name |Top 1 value1|Top2 name|top 2 value|
emp_name ram |3|sham |2
emp_area Usa |4|canada |1
dept sales|3|Marketing|3
zip 805912|5|NA|NA
我开始为它们中的每一个动态生成计数,但无法将它们存储在数据集中
val features=ds.columns.toList
for (e <- features) {
val ds1=ds.groupBy(e).count().sort(desc("count")).limit(5).withColumnRenamed("count", e+"_count")
}
现在如何将所有值收集到一个数据帧中并转换为输出?
这里有一个稍微冗长的方法。您可以将每一列 map
到具有一行的数据框,该行对应于所需输出中的行。如有必要,添加 NA 列。将列名转换为所需的列名,最后执行 unionAll
合并数据帧(每行一行)。
import org.apache.spark.sql.expressions.Window
val top = 2
val result = ds.columns.map(
c => ds.groupBy(c).count()
.withColumn("rn", row_number().over(Window.orderBy(desc("count"))))
.filter(s"rn <= $top")
.groupBy().pivot("rn")
.agg(first(col(c)), first(col("count")))
.select(lit(c), col("*"))
).map(df =>
if (df.columns.size != 1 + top*2)
df.select(List(col("*")) ::: (1 to (top*2+1 - df.columns.size)).toList.map(x => lit("NA")): _*)
else df
).map(df =>
df.toDF(List("feature") ::: (1 to top).toList.flatMap(x => Seq(s"top$x name", s"top$x value")): _*)
).reduce(_ unionAll _)
result.show
+--------+---------+----------+---------+----------+
| feature|top1 name|top1 value|top2 name|top2 value|
+--------+---------+----------+---------+----------+
|emp_name| ram| 3| sham| 2|
|emp_area| USA| 4| Canada| 1|
| dept| Sales| 3|Marketing| 2|
| zip| 805912| 5| NA| NA|
+--------+---------+----------+---------+----------+