定义接受 Spark DataFrame 中对象数组的 UDF?
Defining a UDF that accepts an Array of objects in a Spark DataFrame?
使用 Spark 的数据帧时,需要用户定义函数 (UDF) 来映射列中的数据。 UDF 要求显式指定参数类型。在我的例子中,我需要操作一个由对象数组组成的列,但我不知道要使用什么类型。这是一个例子:
import sqlContext.implicits._
// Start with some data. Each row (here, there's only one row)
// is a topic and a bunch of subjects
val data = sqlContext.read.json(sc.parallelize(Seq(
"""
|{
| "topic" : "pets",
| "subjects" : [
| {"type" : "cat", "score" : 10},
| {"type" : "dog", "score" : 1}
| ]
|}
""")))
使用内置的org.apache.spark.sql.functions
对列中的数据进行基本操作相对简单
import org.apache.spark.sql.functions.size
data.select($"topic", size($"subjects")).show
+-----+--------------+
|topic|size(subjects)|
+-----+--------------+
| pets| 2|
+-----+--------------+
并且通常很容易编写自定义 UDF 来执行任意操作
import org.apache.spark.sql.functions.udf
val enhance = udf { topic : String => topic.toUpperCase() }
data.select(enhance($"topic"), size($"subjects")).show
+----------+--------------+
|UDF(topic)|size(subjects)|
+----------+--------------+
| PETS| 2|
+----------+--------------+
但是如果我想使用 UDF 来操作 "subjects" 列中的对象数组怎么办? UDF 中的参数使用什么类型?例如,如果我想重新实现 size 函数,而不是使用 spark 提供的函数:
val my_size = udf { subjects: Array[Something] => subjects.size }
data.select($"topic", my_size($"subjects")).show
显然 Array[Something]
不起作用...我应该使用什么类型!?我应该完全放弃 Array[]
吗?四处搜寻告诉我 scala.collection.mutable.WrappedArray
可能与它有关,但我仍然需要提供另一种类型。
您要找的是Seq[o.a.s.sql.Row]
:
import org.apache.spark.sql.Row
val my_size = udf { subjects: Seq[Row] => subjects.size }
解释:
ArrayType
的当前表示是,如您所知,WrappedArray
所以Array
不会起作用,最好保持安全。
- According to the official specification,
StructType
的本地(外部)类型是 Row
。不幸的是,这意味着对各个字段的访问不是类型安全的。
备注:
要在 Spark < 2.3 中创建 struct
,传递给 udf
的函数必须 return Product
类型(Tuple*
或case class
),而不是 Row
。那是因为相应的 udf
变体 depend on Scala reflection:
Defines a Scala closure of n arguments as user-defined function (UDF). The data types are automatically inferred based on the Scala closure's signature.
在 Spark >= 2.3 中可以直接 return Row
,as long as the schema is provided.
def udf(f: AnyRef, dataType: DataType): UserDefinedFunction
Defines a deterministic user-defined function (UDF) using a Scala closure. For this variant, the caller must specify the output data type, and there is no automatic input type coercion.
参见示例 How to create a Spark UDF in Java / Kotlin which returns a complex type?。
使用 Spark 的数据帧时,需要用户定义函数 (UDF) 来映射列中的数据。 UDF 要求显式指定参数类型。在我的例子中,我需要操作一个由对象数组组成的列,但我不知道要使用什么类型。这是一个例子:
import sqlContext.implicits._
// Start with some data. Each row (here, there's only one row)
// is a topic and a bunch of subjects
val data = sqlContext.read.json(sc.parallelize(Seq(
"""
|{
| "topic" : "pets",
| "subjects" : [
| {"type" : "cat", "score" : 10},
| {"type" : "dog", "score" : 1}
| ]
|}
""")))
使用内置的org.apache.spark.sql.functions
对列中的数据进行基本操作相对简单
import org.apache.spark.sql.functions.size
data.select($"topic", size($"subjects")).show
+-----+--------------+
|topic|size(subjects)|
+-----+--------------+
| pets| 2|
+-----+--------------+
并且通常很容易编写自定义 UDF 来执行任意操作
import org.apache.spark.sql.functions.udf
val enhance = udf { topic : String => topic.toUpperCase() }
data.select(enhance($"topic"), size($"subjects")).show
+----------+--------------+
|UDF(topic)|size(subjects)|
+----------+--------------+
| PETS| 2|
+----------+--------------+
但是如果我想使用 UDF 来操作 "subjects" 列中的对象数组怎么办? UDF 中的参数使用什么类型?例如,如果我想重新实现 size 函数,而不是使用 spark 提供的函数:
val my_size = udf { subjects: Array[Something] => subjects.size }
data.select($"topic", my_size($"subjects")).show
显然 Array[Something]
不起作用...我应该使用什么类型!?我应该完全放弃 Array[]
吗?四处搜寻告诉我 scala.collection.mutable.WrappedArray
可能与它有关,但我仍然需要提供另一种类型。
您要找的是Seq[o.a.s.sql.Row]
:
import org.apache.spark.sql.Row
val my_size = udf { subjects: Seq[Row] => subjects.size }
解释:
ArrayType
的当前表示是,如您所知,WrappedArray
所以Array
不会起作用,最好保持安全。- According to the official specification,
StructType
的本地(外部)类型是Row
。不幸的是,这意味着对各个字段的访问不是类型安全的。
备注:
要在 Spark < 2.3 中创建
struct
,传递给udf
的函数必须 returnProduct
类型(Tuple*
或case class
),而不是Row
。那是因为相应的udf
变体 depend on Scala reflection:Defines a Scala closure of n arguments as user-defined function (UDF). The data types are automatically inferred based on the Scala closure's signature.
在 Spark >= 2.3 中可以直接 return
Row
,as long as the schema is provided.def udf(f: AnyRef, dataType: DataType): UserDefinedFunction
Defines a deterministic user-defined function (UDF) using a Scala closure. For this variant, the caller must specify the output data type, and there is no automatic input type coercion.参见示例 How to create a Spark UDF in Java / Kotlin which returns a complex type?。