如何使用 pivot table 创建一个新列?
How to create a new column using pivot table?
我有一个 DataFrame,其列 typecol
的值为 22
和 33
。
然后我用下面的代码做一个枢轴 table:
var result = df
.groupBy(col("product_id"))
.pivot("typecol")
.agg(count("*"))
.na.fill(0)
.withColumnRenamed("22", "A_type")
.withColumnRenamed("33", "B_type")
但是,当值 22
或 33
不存在于 df
中时,不会创建列 A_type
和 B_type
。我怎样才能用值 0
创建它们?
您可以 assemble 您想要无条件旋转的列连同它们在 Map
中的最终列名称,并使用 foldLeft
遍历地图以相应地重命名和零填充如下:
import org.apache.spark.sql.functions._
val df = Seq(
(1, "22"),
(1, "22"),
(2, "22"),
(2, "22"),
(2, "22")
).toDF("id", "type")
val pivotCols = Map("22"->"a_type", "33"->"b_type")
val pivotDF = df.groupBy("id").pivot("type").agg(count($"type")).na.fill(0)
val resultDF = pivotCols.keys.foldLeft( pivotDF )( (df, c) =>
if ( df.columns contains c ) df.withColumnRenamed(c, pivotCols(c)) else
df.withColumn(pivotCols(c), lit(0))
)
resultDF.show
// +---+------+------+
// | id|a_type|b_type|
// +---+------+------+
// | 1| 2| 0|
// | 2| 3| 0|
// +---+------+------+
我有一个 DataFrame,其列 typecol
的值为 22
和 33
。
然后我用下面的代码做一个枢轴 table:
var result = df
.groupBy(col("product_id"))
.pivot("typecol")
.agg(count("*"))
.na.fill(0)
.withColumnRenamed("22", "A_type")
.withColumnRenamed("33", "B_type")
但是,当值 22
或 33
不存在于 df
中时,不会创建列 A_type
和 B_type
。我怎样才能用值 0
创建它们?
您可以 assemble 您想要无条件旋转的列连同它们在 Map
中的最终列名称,并使用 foldLeft
遍历地图以相应地重命名和零填充如下:
import org.apache.spark.sql.functions._
val df = Seq(
(1, "22"),
(1, "22"),
(2, "22"),
(2, "22"),
(2, "22")
).toDF("id", "type")
val pivotCols = Map("22"->"a_type", "33"->"b_type")
val pivotDF = df.groupBy("id").pivot("type").agg(count($"type")).na.fill(0)
val resultDF = pivotCols.keys.foldLeft( pivotDF )( (df, c) =>
if ( df.columns contains c ) df.withColumnRenamed(c, pivotCols(c)) else
df.withColumn(pivotCols(c), lit(0))
)
resultDF.show
// +---+------+------+
// | id|a_type|b_type|
// +---+------+------+
// | 1| 2| 0|
// | 2| 3| 0|
// +---+------+------+