无法执行用户定义的函数
Failed to execute user defined function
我有以下 UDF:
val jac_index:(Array[String],Array[String])=>Float=(Sq1:Array[String],Sq2:Array[String])=>
{
val Sq3=Sq1.intersect(Sq2)
val Sq4=Sq1.union(Sq2).distinct
if (!Sq4.isEmpty) Sq3.length.toFloat/Sq4.length.toFloat else 0F
}
val jacUDF=udf(jac_index)
而当我执行下面这句话的时候
val movie_jac_df=movie_pairs_df.withColumn("jac",jacUDF(movie_pairs_df("name"),movie_pairs_df("name2")))
我收到错误 "Failed to execute user defined function"
movie_pairs_df的架构如下
root
|-- movie: string (nullable = true)
|-- name: array (nullable = true)
| |-- element: string (containsNull = true)
|-- movie2: string (nullable = true)
|-- name2: array (nullable = true)
| |-- element: string (containsNull = true)
那是什么原因呢?
Spark 的 DataFrames 模型数组列为 mutable.WrappedArray
,这意味着您的 UDF 应该将两个 WrappedArrays 作为其输入;
如果您将 jac_index
更改为期望两个这样的数组:
import scala.collection.mutable
val jac_index: (mutable.WrappedArray[String], mutable.WrappedArray[String]) => Float =
(Sq1, Sq2) => { /* same implementation */ }
这将按预期工作。
定义udf如下
val jacUDF = udf((Sq1:mutable.WrappedArray[String], Sq2:mutable.WrappedArray[String]) => {
val Sq3=Sq1.intersect(Sq2)
val Sq4=Sq1.union(Sq2).distinct
if (!Sq4.isEmpty) Sq3.length.toFloat/Sq4.length.toFloat else 0F
})
我有以下 UDF:
val jac_index:(Array[String],Array[String])=>Float=(Sq1:Array[String],Sq2:Array[String])=>
{
val Sq3=Sq1.intersect(Sq2)
val Sq4=Sq1.union(Sq2).distinct
if (!Sq4.isEmpty) Sq3.length.toFloat/Sq4.length.toFloat else 0F
}
val jacUDF=udf(jac_index)
而当我执行下面这句话的时候
val movie_jac_df=movie_pairs_df.withColumn("jac",jacUDF(movie_pairs_df("name"),movie_pairs_df("name2")))
我收到错误 "Failed to execute user defined function"
movie_pairs_df的架构如下
root
|-- movie: string (nullable = true)
|-- name: array (nullable = true)
| |-- element: string (containsNull = true)
|-- movie2: string (nullable = true)
|-- name2: array (nullable = true)
| |-- element: string (containsNull = true)
那是什么原因呢?
Spark 的 DataFrames 模型数组列为 mutable.WrappedArray
,这意味着您的 UDF 应该将两个 WrappedArrays 作为其输入;
如果您将 jac_index
更改为期望两个这样的数组:
import scala.collection.mutable
val jac_index: (mutable.WrappedArray[String], mutable.WrappedArray[String]) => Float =
(Sq1, Sq2) => { /* same implementation */ }
这将按预期工作。
定义udf如下
val jacUDF = udf((Sq1:mutable.WrappedArray[String], Sq2:mutable.WrappedArray[String]) => {
val Sq3=Sq1.intersect(Sq2)
val Sq4=Sq1.union(Sq2).distinct
if (!Sq4.isEmpty) Sq3.length.toFloat/Sq4.length.toFloat else 0F
})