如何使用 pivot table 创建一个新列?

How to create a new column using pivot table?

我有一个 DataFrame,其列 typecol 的值为 2233。 然后我用下面的代码做一个枢轴 table:

var result = df
        .groupBy(col("product_id"))
        .pivot("typecol")
        .agg(count("*"))
        .na.fill(0)
        .withColumnRenamed("22", "A_type")
        .withColumnRenamed("33", "B_type")

但是,当值 2233 不存在于 df 中时,不会创建列 A_typeB_type。我怎样才能用值 0 创建它们?

您可以 assemble 您想要无条件旋转的列连同它们在 Map 中的最终列名称,并使用 foldLeft 遍历地图以相应地重命名和零填充如下:

import org.apache.spark.sql.functions._

val df = Seq(
  (1, "22"),
  (1, "22"),
  (2, "22"),
  (2, "22"),
  (2, "22")
).toDF("id", "type")

val pivotCols = Map("22"->"a_type", "33"->"b_type")

val pivotDF = df.groupBy("id").pivot("type").agg(count($"type")).na.fill(0)

val resultDF = pivotCols.keys.foldLeft( pivotDF )( (df, c) => 
  if ( df.columns contains c ) df.withColumnRenamed(c, pivotCols(c)) else 
    df.withColumn(pivotCols(c), lit(0))
)

resultDF.show
// +---+------+------+
// | id|a_type|b_type|
// +---+------+------+
// |  1|     2|     0|
// |  2|     3|     0|
// +---+------+------+