使用匿名函数时 Spark TaskNotSerializable

Spark TaskNotSerializable when using anonymous function

背景

这是我的情况:我正在尝试创建一个 class 来根据内容的某些特征过滤 RDD,但该特征在不同情况下可能会有所不同,因此我想对其进行参数化有一个功能。不幸的是,我似乎 运行 对 Scala 捕获其闭包的方式存在疑问。尽管我的函数是可序列化的,但 class 不是。

spark source on closure cleaning 中的示例来看,我的情况似乎无法解决,但我相信有一种方法可以通过创建正确的 (更小)闭包。

我的代码

class MyFilter(getFeature: Element => String, other: NonSerializable) {
  def filter(rdd: RDD[Element]): RDD[Element] = {
    // All my complicated logic I want to share
    rdd.filter { elem => getFeature(elem) == "myTargetString" }     
}

简化示例

class Foo(f: Int => Double, rdd: RDD[Int]) { 
  def go(data: RDD[Int]) = data.map(f) 
}

val works = new Foo(_.toDouble, otherRdd)
works.go(myRdd).collect() // works

val myMap = Map(1 -> 10d)
val complicatedButSerializableFunc: Int => Double = x => myMap.getOrElse(x, 0)
val doesntWork = new Foo(complicatedButSerializableFunc, otherRdd)
doesntWork.go(myRdd).collect() // craps out

org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: $iwC$$iwC$Foo
Serialization stack:
    - object not serializable (class: $iwC$$iwC$Foo, value: $iwC$$iwC$Foo@61e33118)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: foo, type: class $iwC$$iwC$Foo)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@47d6a31a)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun, <function1>)

// Even though
val out = new ObjectOutputStream(new FileOutputStream("test.obj"))
out.writeObject(complicatedButSerializableFunc) // works

问题

  1. 为什么第一个简化示例不尝试序列化所有 Foo 而第二个示例却尝试序列化?
  2. 如何在不在闭包中包含对 Foo 的引用的情况下获取对可序列化函数的引用?

this article 的帮助下找到了答案。

本质上,当为给定函数创建闭包时,Scala 将包含引用的任何复杂字段的整个对象(如果有人对第一个简单示例中为什么没有发生这种情况有很好的解释,我会接受那个答案)。解决方案是将可序列化值传递给不同的函数,以便仅保留最小引用,非常类似于事件侦听器的 ol' javascript for 循环范例。

例子

def enclose[E, R](enclosed: E)(func: E => R): R = func(enclosed)

class Foo(f: Int => Double, somethingNonserializable: RDD[String]) { 
 def go(data: RDD[Int]) = enclose(f) { actualFunction => data.map(actualFunction) } 
}

或者用JS风格的自执行匿名函数

def go(data: RDD[Int]) = ((actualFunction: Int => Double) => data.map(actualFunction))(f)