我应该为 Spark Dataframe udf 中的元组使用什么数据类型?

What data type should I use for tuple in Spark Dataframe udf?

输入:

val df = Seq((10, (35, 25))).toDF("id", "scorePair")
df.show
+---+---------+
| id|scorePair|
+---+---------+
| 10| [35, 25]|
+---+---------+

预期输出:

+---+-----------+
| id|totalScore |
+---+-----------+
| 10|         60|
+---+-----------+

想做这样的事情,但它不接受类型:

// error
val add = udf((row: Row) => {row match {case (a: Int, b: Int) => a + b}})
df.withColumn("totalScore", add(col("scorePair")))

为什么Row类型是不正确的想法

"Dataframe is an alias for Dataset[Row]"

?

我应该使用什么类型?我怎样才能实现它?


val add = udf((rows: Seq[Row]) => {rows.map {case Row(a: Int, b: Int) => a + b}})
df.groupBy("id").agg(collect_list("scorePair") as "pairSeq").withColumn("totalScore1", add(col("pairSeq"))).select(col("id"), explode(col("totalScore1")) as "totalScore").show
+---+----------+
| id|totalScore|
+---+----------+
| 10|        60|
+---+----------+

但是那真的不干净!

您可以使用 row.getAs[Int](0)row.get(0).asInstanceOf[Int]row.getInt(0) 从行

中获取值
val df = Seq(
  (10, (35, 25))
).toDF("id", "scorePair")


val add = udf((row: Row) => {row.getInt(0) + row.getInt(1)})

df.withColumn("totalScore", add($"scorePair")).show(false)

df.select($"id", $"scorePair._1" + $"scorePair._2" as "totalScore").show(false)

输出:

+---+----------+
|id |totalScore|
+---+----------+
|10 |60        |
+---+----------+

aggregate 函数是对 ArrayType 列中的所有数字求和的最简单方法。 This post 有一个完整的例子。这是片段:

val resDF = df.withColumn(
  "totalScore",
  aggregate(
    col("scorePair"),
    lit(0),
    (col1: Column, col2: Column) => col1 + col2
  )
)

您想尽可能避免使用 UDF。此解决方案仅适用于 Spark 3+。