将 ArrayType 列传递给 Spark Scala 中的 UDF

Pass a ArrayType column to UDF in Spark Scala

我在 Scala 中的 Spark 数据框中有一列是使用

聚合多个列而生成的
 agg(collect_list(struct(col(abc), col(aaa)).as(def)

我想将此列传递给 UDF 以进一步处理此聚合列中的索引之一。

当我将参数传递给我的 UDF 时:

.withColumn(def, remove
            (col(xyz), col(def)))

UDF- 类型为 Seq[Row]: val removeUnstableActivations: UserDefinedFunction = udf((xyz: java.util.Date, def: Seq[Row])

我收到错误:

Exception encountered when invoking run on a nested suite - Schema for type org.apache.spark.sql.Row is not supported

我应该如何传递这些列以及 UDF 中该列的数据类型应该是什么?

确实不支持 Row 类型的架构,但您可以 return 一个案例 class。 Spark 会将 returned case class 视为 StructType。例如:

import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.Row

val df = Seq(
  (1, "a"),
  (2, "b"),
  (3, "c")
).toDF("number", "word")

val aggDf = df.agg(
  collect_list(struct(col("number"), col("word"))) as "aggColumn"
)

aggDf.printSchema()
// |-- aggColumn: array (nullable = true)
// |    |-- element: struct (containsNull = true)
// |    |    |-- number: string (nullable = true)
// |    |    |-- word: integer (nullable = false)

case class ReturnSchema(word: String, number: Int)

val myUdf: UserDefinedFunction =
  udf((collection: Seq[Row]) => {
    collection.map(r => {
      val word   = r.getAs[String]("word")
      val newNumber = r.getAs[Int]("number") * 100

      new ReturnSchema(word, newNumber)
    })
  })
  
val finalDf = aggDf.select(myUdf(col("aggColumn")).as("udfTranformedColumn"))

finalDf.printSchema
// root
//  |-- udfTranformedColumn: array (nullable = true)
//  |    |-- element: struct (containsNull = true)
//  |    |    |-- word: string (nullable = true)
//  |    |    |-- number: integer (nullable = false)

finalDf.show(false)
// +------------------------------+
// |udfTranformedColumn           |
// +------------------------------+
// |[[a, 100], [b, 200], [c, 300]]|
// +------------------------------+