将 ArrayType 列传递给 Spark Scala 中的 UDF
Pass a ArrayType column to UDF in Spark Scala
我在 Scala 中的 Spark 数据框中有一列是使用
聚合多个列而生成的
agg(collect_list(struct(col(abc), col(aaa)).as(def)
我想将此列传递给 UDF 以进一步处理此聚合列中的索引之一。
当我将参数传递给我的 UDF 时:
.withColumn(def, remove
(col(xyz), col(def)))
UDF- 类型为 Seq[Row]:
val removeUnstableActivations: UserDefinedFunction = udf((xyz: java.util.Date, def: Seq[Row])
我收到错误:
Exception encountered when invoking run on a nested suite - Schema for type org.apache.spark.sql.Row is not supported
我应该如何传递这些列以及 UDF 中该列的数据类型应该是什么?
确实不支持 Row 类型的架构,但您可以 return 一个案例 class。 Spark 会将 returned case class 视为 StructType。例如:
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.Row
val df = Seq(
(1, "a"),
(2, "b"),
(3, "c")
).toDF("number", "word")
val aggDf = df.agg(
collect_list(struct(col("number"), col("word"))) as "aggColumn"
)
aggDf.printSchema()
// |-- aggColumn: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- number: string (nullable = true)
// | | |-- word: integer (nullable = false)
case class ReturnSchema(word: String, number: Int)
val myUdf: UserDefinedFunction =
udf((collection: Seq[Row]) => {
collection.map(r => {
val word = r.getAs[String]("word")
val newNumber = r.getAs[Int]("number") * 100
new ReturnSchema(word, newNumber)
})
})
val finalDf = aggDf.select(myUdf(col("aggColumn")).as("udfTranformedColumn"))
finalDf.printSchema
// root
// |-- udfTranformedColumn: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- word: string (nullable = true)
// | | |-- number: integer (nullable = false)
finalDf.show(false)
// +------------------------------+
// |udfTranformedColumn |
// +------------------------------+
// |[[a, 100], [b, 200], [c, 300]]|
// +------------------------------+
我在 Scala 中的 Spark 数据框中有一列是使用
聚合多个列而生成的 agg(collect_list(struct(col(abc), col(aaa)).as(def)
我想将此列传递给 UDF 以进一步处理此聚合列中的索引之一。
当我将参数传递给我的 UDF 时:
.withColumn(def, remove
(col(xyz), col(def)))
UDF- 类型为 Seq[Row]: val removeUnstableActivations: UserDefinedFunction = udf((xyz: java.util.Date, def: Seq[Row])
我收到错误:
Exception encountered when invoking run on a nested suite - Schema for type org.apache.spark.sql.Row is not supported
我应该如何传递这些列以及 UDF 中该列的数据类型应该是什么?
确实不支持 Row 类型的架构,但您可以 return 一个案例 class。 Spark 会将 returned case class 视为 StructType。例如:
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.Row
val df = Seq(
(1, "a"),
(2, "b"),
(3, "c")
).toDF("number", "word")
val aggDf = df.agg(
collect_list(struct(col("number"), col("word"))) as "aggColumn"
)
aggDf.printSchema()
// |-- aggColumn: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- number: string (nullable = true)
// | | |-- word: integer (nullable = false)
case class ReturnSchema(word: String, number: Int)
val myUdf: UserDefinedFunction =
udf((collection: Seq[Row]) => {
collection.map(r => {
val word = r.getAs[String]("word")
val newNumber = r.getAs[Int]("number") * 100
new ReturnSchema(word, newNumber)
})
})
val finalDf = aggDf.select(myUdf(col("aggColumn")).as("udfTranformedColumn"))
finalDf.printSchema
// root
// |-- udfTranformedColumn: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- word: string (nullable = true)
// | | |-- number: integer (nullable = false)
finalDf.show(false)
// +------------------------------+
// |udfTranformedColumn |
// +------------------------------+
// |[[a, 100], [b, 200], [c, 300]]|
// +------------------------------+