UDF 中的 Spark classnotfoundexception
Spark classnotfoundexception in UDF
当我调用一个函数时它起作用了。但是当我在 UDF 中调用该函数时将不起作用。
这是完整代码。
val sparkConf = new SparkConf().setAppName("HiveFromSpark").set("spark.driver.allowMultipleContexts","true")
val sc = new SparkContext(sparkConf)
val hive = new org.apache.spark.sql.hive.HiveContext(sc)
///////////// UDFS
def toDoubleArrayFun(vec:Any) : scala.Array[Double] = {
return vec.asInstanceOf[WrappedArray[Double]].toArray
}
def toDoubleArray=udf((vec:Any) => toDoubleArrayFun(vec))
//////////// PROCESS
var df = hive.sql("select vec from mst_wordvector_tapi_128dim where word='soccer'")
println("==== test get value then transform")
println(df.head().get(0))
println(toDoubleArrayFun(df.head().get(0)))
println("==== test transform by udf")
df.withColumn("word_v", toDoubleArray(col("vec")))
.show(10);
然后是这个输出。
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@6e9484ad
hive: org.apache.spark.sql.hive.HiveContext =
toDoubleArrayFun: (vec: Any)Array[Double]
toDoubleArray: org.apache.spark.sql.UserDefinedFunction
df: org.apache.spark.sql.DataFrame = [vec: array<double>]
==== test get value then transform
WrappedArray(-0.88675,, 0.0216657)
[D@4afcc447
==== test transform by udf
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 5, xdad008.band.nhnsystem.com): java.lang.ClassNotFoundException: $iwC$$iwC$$iwC$$iwC$$iwC$$$ba2a895f25683dd48fe725fd825a71$$$$$$iwC$$anonfun$toDoubleArray
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
此处完整输出。
https://gist.github.com/jeesim2/efb52f12d6cd4c1b255fd0c917411370
如您所见,"toDoubleArrayFun" 函数运行良好,但在 udf 中它声明 ClassNotFoundException。
我不能改变hive的数据结构,需要将vec转换成Array[Double]来创建一个Vector实例
那么上面的代码有什么问题?
Spark 版本为 1.6.1
更新 1
Hive table 的 'vec' 列类型是“array<double>
”
下面的代码也会导致错误
var df = hive.sql("select vec from mst_wordvector_tapi_128dim where
word='hh'")
df.printSchema()
var word_vec = df.head().get(0)
println(word_vec)
println(Vectors.dense(word_vec))
产出
df: org.apache.spark.sql.DataFrame = [vec: array<double>]
root
|-- vec: array (nullable = true)
| |-- element: double (containsNull = true)
==== test get value then transform
word_vec: Any = WrappedArray(-0.88675,...7)
<console>:288: error: overloaded method value dense with alternatives:
(values: Array[Double])org.apache.spark.mllib.linalg.Vector <and>
(firstValue: Double,otherValues:Double*)org.apache.spark.mllib.linalg.Vector
cannot be applied to (Any)
println(Vectors.dense(word_vec))
这意味着无法将配置单元“array<double>
”列转换为 Array<Double>
实际上我想用两个 array<double>
列来计算 distance:Double。
如何添加基于 array<double>
列的向量列?
典型的方法是
Vectors.sqrt(Vectors.dense(Array<Double>, Array<Double>)
由于udf
函数必须进行序列化和反序列化过程,因此any
DataType 将不起作用。您必须定义要传递给 udf
函数的列的确切数据类型。
从您问题的输出来看,您的数据框中似乎只有一列,即 vec
,属于 Array[Double]
类型
df: org.apache.spark.sql.DataFrame = [vec: array<double>]
实际上不需要那个 udf 函数,因为你的 vec
列已经是 Array
数据类型,这也是你的 udf
函数正在做的,即转换值至 Array[Double]
.
现在,您的其他函数调用正在运行
println(toDoubleArrayFun(df.head().get(0)))
因为不需要序列化和反序列化过程,它只是scala函数调用。
当我调用一个函数时它起作用了。但是当我在 UDF 中调用该函数时将不起作用。
这是完整代码。
val sparkConf = new SparkConf().setAppName("HiveFromSpark").set("spark.driver.allowMultipleContexts","true")
val sc = new SparkContext(sparkConf)
val hive = new org.apache.spark.sql.hive.HiveContext(sc)
///////////// UDFS
def toDoubleArrayFun(vec:Any) : scala.Array[Double] = {
return vec.asInstanceOf[WrappedArray[Double]].toArray
}
def toDoubleArray=udf((vec:Any) => toDoubleArrayFun(vec))
//////////// PROCESS
var df = hive.sql("select vec from mst_wordvector_tapi_128dim where word='soccer'")
println("==== test get value then transform")
println(df.head().get(0))
println(toDoubleArrayFun(df.head().get(0)))
println("==== test transform by udf")
df.withColumn("word_v", toDoubleArray(col("vec")))
.show(10);
然后是这个输出。
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@6e9484ad
hive: org.apache.spark.sql.hive.HiveContext =
toDoubleArrayFun: (vec: Any)Array[Double]
toDoubleArray: org.apache.spark.sql.UserDefinedFunction
df: org.apache.spark.sql.DataFrame = [vec: array<double>]
==== test get value then transform
WrappedArray(-0.88675,, 0.0216657)
[D@4afcc447
==== test transform by udf
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 5, xdad008.band.nhnsystem.com): java.lang.ClassNotFoundException: $iwC$$iwC$$iwC$$iwC$$iwC$$$ba2a895f25683dd48fe725fd825a71$$$$$$iwC$$anonfun$toDoubleArray
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
此处完整输出。 https://gist.github.com/jeesim2/efb52f12d6cd4c1b255fd0c917411370
如您所见,"toDoubleArrayFun" 函数运行良好,但在 udf 中它声明 ClassNotFoundException。
我不能改变hive的数据结构,需要将vec转换成Array[Double]来创建一个Vector实例
那么上面的代码有什么问题?
Spark 版本为 1.6.1
更新 1
Hive table 的 'vec' 列类型是“array<double>
”
下面的代码也会导致错误
var df = hive.sql("select vec from mst_wordvector_tapi_128dim where
word='hh'")
df.printSchema()
var word_vec = df.head().get(0)
println(word_vec)
println(Vectors.dense(word_vec))
产出
df: org.apache.spark.sql.DataFrame = [vec: array<double>]
root
|-- vec: array (nullable = true)
| |-- element: double (containsNull = true)
==== test get value then transform
word_vec: Any = WrappedArray(-0.88675,...7)
<console>:288: error: overloaded method value dense with alternatives:
(values: Array[Double])org.apache.spark.mllib.linalg.Vector <and>
(firstValue: Double,otherValues:Double*)org.apache.spark.mllib.linalg.Vector
cannot be applied to (Any)
println(Vectors.dense(word_vec))
这意味着无法将配置单元“array<double>
”列转换为 Array<Double>
实际上我想用两个 array<double>
列来计算 distance:Double。
如何添加基于 array<double>
列的向量列?
典型的方法是
Vectors.sqrt(Vectors.dense(Array<Double>, Array<Double>)
由于udf
函数必须进行序列化和反序列化过程,因此any
DataType 将不起作用。您必须定义要传递给 udf
函数的列的确切数据类型。
从您问题的输出来看,您的数据框中似乎只有一列,即 vec
,属于 Array[Double]
类型
df: org.apache.spark.sql.DataFrame = [vec: array<double>]
实际上不需要那个 udf 函数,因为你的 vec
列已经是 Array
数据类型,这也是你的 udf
函数正在做的,即转换值至 Array[Double]
.
现在,您的其他函数调用正在运行
println(toDoubleArrayFun(df.head().get(0)))
因为不需要序列化和反序列化过程,它只是scala函数调用。