我应该为 Spark Dataframe udf 中的元组使用什么数据类型?
What data type should I use for tuple in Spark Dataframe udf?
输入:
val df = Seq((10, (35, 25))).toDF("id", "scorePair")
df.show
+---+---------+
| id|scorePair|
+---+---------+
| 10| [35, 25]|
+---+---------+
预期输出:
+---+-----------+
| id|totalScore |
+---+-----------+
| 10| 60|
+---+-----------+
想做这样的事情,但它不接受行类型:
// error
val add = udf((row: Row) => {row match {case (a: Int, b: Int) => a + b}})
df.withColumn("totalScore", add(col("scorePair")))
为什么Row类型是不正确的想法
"Dataframe is an alias for Dataset[Row]"
?
我应该使用什么类型?我怎样才能实现它?
- 我强调 Row 类型,因为至少我设法按以下方式使用 Row(它处理一列作为行)实现:
val add = udf((rows: Seq[Row]) => {rows.map {case Row(a: Int, b: Int) => a + b}})
df.groupBy("id").agg(collect_list("scorePair") as "pairSeq").withColumn("totalScore1", add(col("pairSeq"))).select(col("id"), explode(col("totalScore1")) as "totalScore").show
+---+----------+
| id|totalScore|
+---+----------+
| 10| 60|
+---+----------+
但是那真的不干净!
您可以使用 row.getAs[Int](0)
、row.get(0).asInstanceOf[Int]
、row.getInt(0)
从行
中获取值
val df = Seq(
(10, (35, 25))
).toDF("id", "scorePair")
val add = udf((row: Row) => {row.getInt(0) + row.getInt(1)})
df.withColumn("totalScore", add($"scorePair")).show(false)
df.select($"id", $"scorePair._1" + $"scorePair._2" as "totalScore").show(false)
输出:
+---+----------+
|id |totalScore|
+---+----------+
|10 |60 |
+---+----------+
aggregate
函数是对 ArrayType 列中的所有数字求和的最简单方法。 This post 有一个完整的例子。这是片段:
val resDF = df.withColumn(
"totalScore",
aggregate(
col("scorePair"),
lit(0),
(col1: Column, col2: Column) => col1 + col2
)
)
您想尽可能避免使用 UDF。此解决方案仅适用于 Spark 3+。
输入:
val df = Seq((10, (35, 25))).toDF("id", "scorePair")
df.show
+---+---------+
| id|scorePair|
+---+---------+
| 10| [35, 25]|
+---+---------+
预期输出:
+---+-----------+
| id|totalScore |
+---+-----------+
| 10| 60|
+---+-----------+
想做这样的事情,但它不接受行类型:
// error
val add = udf((row: Row) => {row match {case (a: Int, b: Int) => a + b}})
df.withColumn("totalScore", add(col("scorePair")))
为什么Row类型是不正确的想法
"Dataframe is an alias for Dataset[Row]"
?
我应该使用什么类型?我怎样才能实现它?
- 我强调 Row 类型,因为至少我设法按以下方式使用 Row(它处理一列作为行)实现:
val add = udf((rows: Seq[Row]) => {rows.map {case Row(a: Int, b: Int) => a + b}})
df.groupBy("id").agg(collect_list("scorePair") as "pairSeq").withColumn("totalScore1", add(col("pairSeq"))).select(col("id"), explode(col("totalScore1")) as "totalScore").show
+---+----------+
| id|totalScore|
+---+----------+
| 10| 60|
+---+----------+
但是那真的不干净!
您可以使用 row.getAs[Int](0)
、row.get(0).asInstanceOf[Int]
、row.getInt(0)
从行
val df = Seq(
(10, (35, 25))
).toDF("id", "scorePair")
val add = udf((row: Row) => {row.getInt(0) + row.getInt(1)})
df.withColumn("totalScore", add($"scorePair")).show(false)
df.select($"id", $"scorePair._1" + $"scorePair._2" as "totalScore").show(false)
输出:
+---+----------+
|id |totalScore|
+---+----------+
|10 |60 |
+---+----------+
aggregate
函数是对 ArrayType 列中的所有数字求和的最简单方法。 This post 有一个完整的例子。这是片段:
val resDF = df.withColumn(
"totalScore",
aggregate(
col("scorePair"),
lit(0),
(col1: Column, col2: Column) => col1 + col2
)
)
您想尽可能避免使用 UDF。此解决方案仅适用于 Spark 3+。