使用不可序列化的对象创建 SparkSQL UDF

Create SparkSQL UDF with non serializable objects

我正在尝试编写一个我想在 sqlContext 中的 Hive tables 上使用的 UDF。是否有可能以任何方式包含来自其他不可序列化的库的对象?这是不起作用的最小示例:

def myUDF(s: String) = {
 import sun.misc.BASE64Encoder
 val coder= new BASE64Encoder
 val encoded= decoder.encode(s)
 encoded
}

我在spark中注册函数shell为udf函数

val encoding = sqlContext.udf.register("encoder", myUDF)

如果我尝试 运行 它在 table "test"

sqlContext.sql("SELECT encoder(colname) from test").show()

我收到错误

org.apache.spark.SparkException: Task not serializable
object not serializable (class: sun.misc.BASE64Encoder, value: sun.misc.BASE64Encoder@4a7f9a94)

有解决办法吗?我尝试将 myUDF 嵌入到一个对象和一个 class 中,但这也不起作用。

您可以尝试将 udf 函数定义为

def encoder = udf((s: String) => {
  import sun.misc.BASE64Encoder
  val coder= new BASE64Encoder
  val encoded= coder.encode(s.getBytes("UTF-8"))
  encoded
})

并将 udf 函数调用为

dataframe.withColumn("encoded", encoder(col("id"))).show

已更新

正如@santon 指出的那样,BASE64Encoder 编码器是为 数据帧 中的每个 启动的,这可能会导致性能问题。解决方案是创建 BASE64Encoder 静态对象 并在 udf 函数中调用它。