使用不可序列化的对象创建 SparkSQL UDF
Create SparkSQL UDF with non serializable objects
我正在尝试编写一个我想在 sqlContext 中的 Hive tables 上使用的 UDF。是否有可能以任何方式包含来自其他不可序列化的库的对象?这是不起作用的最小示例:
def myUDF(s: String) = {
import sun.misc.BASE64Encoder
val coder= new BASE64Encoder
val encoded= decoder.encode(s)
encoded
}
我在spark中注册函数shell为udf函数
val encoding = sqlContext.udf.register("encoder", myUDF)
如果我尝试 运行 它在 table "test"
sqlContext.sql("SELECT encoder(colname) from test").show()
我收到错误
org.apache.spark.SparkException: Task not serializable
object not serializable (class: sun.misc.BASE64Encoder, value: sun.misc.BASE64Encoder@4a7f9a94)
有解决办法吗?我尝试将 myUDF 嵌入到一个对象和一个 class 中,但这也不起作用。
您可以尝试将 udf
函数定义为
def encoder = udf((s: String) => {
import sun.misc.BASE64Encoder
val coder= new BASE64Encoder
val encoded= coder.encode(s.getBytes("UTF-8"))
encoded
})
并将 udf
函数调用为
dataframe.withColumn("encoded", encoder(col("id"))).show
已更新
正如@santon 指出的那样,BASE64Encoder
编码器是为 数据帧 中的每个 行 启动的,这可能会导致性能问题。解决方案是创建 BASE64Encoder
的 静态对象 并在 udf
函数中调用它。
我正在尝试编写一个我想在 sqlContext 中的 Hive tables 上使用的 UDF。是否有可能以任何方式包含来自其他不可序列化的库的对象?这是不起作用的最小示例:
def myUDF(s: String) = {
import sun.misc.BASE64Encoder
val coder= new BASE64Encoder
val encoded= decoder.encode(s)
encoded
}
我在spark中注册函数shell为udf函数
val encoding = sqlContext.udf.register("encoder", myUDF)
如果我尝试 运行 它在 table "test"
sqlContext.sql("SELECT encoder(colname) from test").show()
我收到错误
org.apache.spark.SparkException: Task not serializable
object not serializable (class: sun.misc.BASE64Encoder, value: sun.misc.BASE64Encoder@4a7f9a94)
有解决办法吗?我尝试将 myUDF 嵌入到一个对象和一个 class 中,但这也不起作用。
您可以尝试将 udf
函数定义为
def encoder = udf((s: String) => {
import sun.misc.BASE64Encoder
val coder= new BASE64Encoder
val encoded= coder.encode(s.getBytes("UTF-8"))
encoded
})
并将 udf
函数调用为
dataframe.withColumn("encoded", encoder(col("id"))).show
已更新
正如@santon 指出的那样,BASE64Encoder
编码器是为 数据帧 中的每个 行 启动的,这可能会导致性能问题。解决方案是创建 BASE64Encoder
的 静态对象 并在 udf
函数中调用它。