如何命名聚合列?

How to name aggregate columns?

我在 Scala 中使用 Spark,我的聚合列是匿名的。有没有一种方便的方法可以重命名数据集中的多列?我考虑过用 as 强加一个模式,但是键列是一个结构(由于 groupBy 操作),我找不到如何用 case class 定义 StructType在里面。

我尝试按如下方式定义模式:

val returnSchema = StructType(StructField("edge", StructType(StructField("src", IntegerType, true),
                                                             StructField("dst", IntegerType), true)), 
                              StructField("count", LongType, true))
edge_count.as[returnSchema]

但是我遇到了一个编译错误:

Message: <console>:74: error: overloaded method value apply with alternatives:
  (fields: Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
 cannot be applied to (org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, Boolean)
       val returnSchema = StructType(StructField("edge", StructType(StructField("src", IntegerType, true),

最好的解决方案是明确命名您的列,例如,

df
  .groupBy('a, 'b)
  .agg(
    expr("count(*) as cnt"),
    expr("sum(x) as x"),
    expr("sum(y)").as("y")
  )

如果您使用的是数据集,则必须提供列的类型,例如 expr("count(*) as cnt").as[Long]

您可以直接使用 DSL,但我经常发现它比简单的 SQL 表达式更冗长。

如果要进行批量重命名,请使用 Map,然后 foldLeft 数据框。

我最终在 select 语句中使用了 aliases;例如,

ds.select($"key.src".as[Short], 
          $"key.dst".as[Short], 
          $"sum(count)".alias("count").as[Long])

首先,我必须使用 printSchema 来确定派生列名称:

> ds.printSchema

root
 |-- key: struct (nullable = false)
 |    |-- src: short (nullable = false)
 |    |-- dst: short (nullable = false)
 |-- sum(count): long (nullable = true)

我同意 Sim 的回答,最方便的方法是在创建时明确命名列。这是另一种方式如何别名列(不使用 expr):

import org.apache.spark.sql.functions._
import spark.implicits._

val df = Seq(
  (1, "a"),
  (2, "b"),
  (3, "c")
).toDF("number", "word")

val aggDf = df.agg(
  collect_list(struct(col("number"), col("word"))) as "myStruct",
  sum(col("number")) as "mySum",
  count(col("*")) as "myCnt"
)

aggDf.printSchema

// |-- myStruct: array (nullable = true)
// |    |-- element: struct (containsNull = true)
// |    |    |-- number: integer (nullable = false)
// |    |    |-- word: string (nullable = true)
// |-- mySum: long (nullable = true)
// |-- myCnt: long (nullable = false)

aggDf.show()

// +------------------------+-----+-----+
// |myStruct                |mySum|myCnt|
// +------------------------+-----+-----+
// |[[1, a], [2, b], [3, c]]|6    |3    |
// +------------------------+-----+-----+