如何在 Spark 2.3.0 UDF 中为每个工作人员构造和保留一个引用对象?

How to construct and persist a reference object per worker in a Spark 2.3.0 UDF?

在 Spark 2.3.0 Structured Streaming 作业中,我需要将一列附加到 DataFrame,该列派生自现有列的同一行的值。

我想在 UDF and use 中定义此转换以构建新的 DataFrame。

执行此转换需要咨询一个构建起来非常昂贵的参考对象——每条记录构建一次会产生不可接受的性能。

为每个工作节点构造和保留一次此对象以便它可以在每批中的每条记录中重复引用的最佳方法是什么?请注意,该对象不可序列化。

我目前的尝试围绕子类化 UserDefinedFunction to add the expensive object as a lazy member and providing an alternate constructor to this subclass that does the init normally performed by the udf function 展开,但到目前为止我一直无法让它执行 udf 所做的那种类型强制——一些深层类型推断是当我的转换 lambda 处理输入和输出的字符串时,需要 org.apache.spark.sql.Column 类型的对象。

像这样:

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DataType

class ExpensiveReference{
  def ExpensiveReference() = ... // Very slow
  def transformString(in:String) = ... // Fast
}

class PersistentValUDF(f: AnyRef, dataType: DataType, inputTypes: Option[Seq[DataType]]) extends UserDefinedFunction(f: AnyRef, dataType: DataType, inputTypes: Option[Seq[DataType]]){  
  lazy val ExpensiveReference = new ExpensiveReference()
  def PersistentValUDF(){
    this(((in:String) => ExpensiveReference.transformString(in) ):(String => String), StringType, Some(List(StringType)))
  }
}

我越深入这个兔子洞,就越怀疑有更好的方法来完成我忽略的这个。因此这个 post.

编辑: 我测试了在 UDF 中声明的对象中延迟初始化引用;这会触发重新初始化。示例代码和对象

class IntBox {
  var valu = 0;
  def increment {
    valu = valu + 1
  }
  def get:Int ={
    return valu
  }
}


val altUDF = udf((input:String) => {
  object ExpensiveRef{
     lazy val box = new IntBox
     def transform(in:String):String={
       box.increment
       return in + box.get.toString
     }
  }
  ExpensiveRef.transform(input)
})

上面的UDF总是追加1;所以惰性对象正在按记录重新初始化。

免责声明让我继续做这件事,但请认为这是一项正在进行的工作(反对票是大禁忌:))

我要做的是使用带有 lazy val 的 Scala 对象作为昂贵的参考。

object ExpensiveReference {
  lazy val ref = ???
  def transform(in:String) = {
    // use ref here
  }
}

对于对象,无论您在 Spark 执行器上做什么(无论是 UDF 的一部分还是任何其他计算)都将在第一次访问时实例化 ExpensiveReference.ref。您可以直接访问它或 transform.

的一部分

同样,您是在 UDF 还是 UDAF 或任何其他转换中执行此操作并不重要。关键是一旦计算发生在 Spark 执行器上 "a very-expensive-to-construct reference object -- constructing it once per record yields unacceptable performance." 只会发生一次。

它可以在 UDF 中(只是为了更清楚)。

我发现 this post 我能够将其选项 1 变成可行的解决方案。最终结果类似于 Jacek Laskowski 的回答,但有一些调整:

  1. 将对象定义拉出 UDF 的范围 .即使是懒惰的,如果它在 UDF 的范围内定义,它仍然会重新初始化。
  2. 将转换函数从对象移到 UDF 的 lambda 中(需要避免序列化错误)
  3. 在 UDF lambda 的闭包中捕获对象的惰性成员

像这样:

object ExpensiveReference {
  lazy val ref = ...
  }
val persistentUDF = udf((input:String)=>{
  /*transform code that references ExpensiveReference.ref*/
})