如何命名聚合列?
How to name aggregate columns?
我在 Scala 中使用 Spark,我的聚合列是匿名的。有没有一种方便的方法可以重命名数据集中的多列?我考虑过用 as
强加一个模式,但是键列是一个结构(由于 groupBy
操作),我找不到如何用 case class
定义 StructType
在里面。
我尝试按如下方式定义模式:
val returnSchema = StructType(StructField("edge", StructType(StructField("src", IntegerType, true),
StructField("dst", IntegerType), true)),
StructField("count", LongType, true))
edge_count.as[returnSchema]
但是我遇到了一个编译错误:
Message: <console>:74: error: overloaded method value apply with alternatives:
(fields: Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
(fields: java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
(fields: Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
cannot be applied to (org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, Boolean)
val returnSchema = StructType(StructField("edge", StructType(StructField("src", IntegerType, true),
最好的解决方案是明确命名您的列,例如,
df
.groupBy('a, 'b)
.agg(
expr("count(*) as cnt"),
expr("sum(x) as x"),
expr("sum(y)").as("y")
)
如果您使用的是数据集,则必须提供列的类型,例如 expr("count(*) as cnt").as[Long]
。
您可以直接使用 DSL,但我经常发现它比简单的 SQL 表达式更冗长。
如果要进行批量重命名,请使用 Map
,然后 foldLeft
数据框。
我最终在 select
语句中使用了 alias
es;例如,
ds.select($"key.src".as[Short],
$"key.dst".as[Short],
$"sum(count)".alias("count").as[Long])
首先,我必须使用 printSchema
来确定派生列名称:
> ds.printSchema
root
|-- key: struct (nullable = false)
| |-- src: short (nullable = false)
| |-- dst: short (nullable = false)
|-- sum(count): long (nullable = true)
我同意 Sim 的回答,最方便的方法是在创建时明确命名列。这是另一种方式如何别名列(不使用 expr
):
import org.apache.spark.sql.functions._
import spark.implicits._
val df = Seq(
(1, "a"),
(2, "b"),
(3, "c")
).toDF("number", "word")
val aggDf = df.agg(
collect_list(struct(col("number"), col("word"))) as "myStruct",
sum(col("number")) as "mySum",
count(col("*")) as "myCnt"
)
aggDf.printSchema
// |-- myStruct: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- number: integer (nullable = false)
// | | |-- word: string (nullable = true)
// |-- mySum: long (nullable = true)
// |-- myCnt: long (nullable = false)
aggDf.show()
// +------------------------+-----+-----+
// |myStruct |mySum|myCnt|
// +------------------------+-----+-----+
// |[[1, a], [2, b], [3, c]]|6 |3 |
// +------------------------+-----+-----+
我在 Scala 中使用 Spark,我的聚合列是匿名的。有没有一种方便的方法可以重命名数据集中的多列?我考虑过用 as
强加一个模式,但是键列是一个结构(由于 groupBy
操作),我找不到如何用 case class
定义 StructType
在里面。
我尝试按如下方式定义模式:
val returnSchema = StructType(StructField("edge", StructType(StructField("src", IntegerType, true),
StructField("dst", IntegerType), true)),
StructField("count", LongType, true))
edge_count.as[returnSchema]
但是我遇到了一个编译错误:
Message: <console>:74: error: overloaded method value apply with alternatives:
(fields: Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
(fields: java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
(fields: Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
cannot be applied to (org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, Boolean)
val returnSchema = StructType(StructField("edge", StructType(StructField("src", IntegerType, true),
最好的解决方案是明确命名您的列,例如,
df
.groupBy('a, 'b)
.agg(
expr("count(*) as cnt"),
expr("sum(x) as x"),
expr("sum(y)").as("y")
)
如果您使用的是数据集,则必须提供列的类型,例如 expr("count(*) as cnt").as[Long]
。
您可以直接使用 DSL,但我经常发现它比简单的 SQL 表达式更冗长。
如果要进行批量重命名,请使用 Map
,然后 foldLeft
数据框。
我最终在 select
语句中使用了 alias
es;例如,
ds.select($"key.src".as[Short],
$"key.dst".as[Short],
$"sum(count)".alias("count").as[Long])
首先,我必须使用 printSchema
来确定派生列名称:
> ds.printSchema
root
|-- key: struct (nullable = false)
| |-- src: short (nullable = false)
| |-- dst: short (nullable = false)
|-- sum(count): long (nullable = true)
我同意 Sim 的回答,最方便的方法是在创建时明确命名列。这是另一种方式如何别名列(不使用 expr
):
import org.apache.spark.sql.functions._
import spark.implicits._
val df = Seq(
(1, "a"),
(2, "b"),
(3, "c")
).toDF("number", "word")
val aggDf = df.agg(
collect_list(struct(col("number"), col("word"))) as "myStruct",
sum(col("number")) as "mySum",
count(col("*")) as "myCnt"
)
aggDf.printSchema
// |-- myStruct: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- number: integer (nullable = false)
// | | |-- word: string (nullable = true)
// |-- mySum: long (nullable = true)
// |-- myCnt: long (nullable = false)
aggDf.show()
// +------------------------+-----+-----+
// |myStruct |mySum|myCnt|
// +------------------------+-----+-----+
// |[[1, a], [2, b], [3, c]]|6 |3 |
// +------------------------+-----+-----+