在 spark-shell 中使用全局对象时隐式 val 序列化
implicit val serialization when using global object in spark-shell
我不清楚为什么(不可序列化的)隐式 val 在此处被序列化(抛出异常):
implicit val sc2:SparkContext = sc
val s1 = "asdf"
sc.parallelize(Array(1,2,3)).map(x1 => s1.map(x => 4))
但不是当 s1 的值在闭包范围内时:
implicit val sc2:SparkContext = sc
sc.parallelize(Array(1,2,3)).map(x1 => "asdf".map(x => 4))
我的用例显然更复杂,但我已将其归结为这个问题。
(解决方案是将隐式val定义为@transient)
这取决于这些行所在的范围:
让我们来看看三个选项 - 在 方法 中,在 class 中没有 s1
,并在 class 和 s1
:
object TTT {
val sc = new SparkContext("local", "test")
def main(args: Array[String]): Unit = {
new A().foo() // works
new B // works
new C // fails
}
class A {
def foo(): Unit = {
// no problem here: vars in a method can be serialized on their own
implicit val sc2: SparkContext = sc
val s1 = "asdf"
sc.parallelize(Array(1, 2, 3)).map(x1 => s1.map(x => 4)).count()
println("in A - works!")
}
}
class B {
// no problem here: B isn't serialized at all because there are no references to its members
implicit val sc2: SparkContext = sc
sc.parallelize(Array(1, 2, 3)).map(x1 => "asdf".map(x => 4)).count()
println("in B - works!")
}
class C extends Serializable {
implicit val sc2: SparkContext = sc
val s1 = "asdf" // to serialize s1, Spark will try serializing the YYY instance, which will serialize sc2
sc.parallelize(Array(1, 2, 3)).map(x1 => s1.map(x => 4)).count() // fails
}
}
底线——隐含与否,当且仅当 s1
和 sc2
是 class 的成员时,这将失败,这意味着 class 会必须序列化并且 "drag" 它们都将被序列化。
范围是spark-shell REPL。在这种情况下,sc2(以及在顶级 REPL 作用域中定义的任何其他隐式 val)只有在它是隐式的并且在 RDD 操作中使用该作用域的另一个 val 时才会被序列化。这是因为隐式值需要全局可用,因此会自动序列化到所有工作节点。
我不清楚为什么(不可序列化的)隐式 val 在此处被序列化(抛出异常):
implicit val sc2:SparkContext = sc
val s1 = "asdf"
sc.parallelize(Array(1,2,3)).map(x1 => s1.map(x => 4))
但不是当 s1 的值在闭包范围内时:
implicit val sc2:SparkContext = sc
sc.parallelize(Array(1,2,3)).map(x1 => "asdf".map(x => 4))
我的用例显然更复杂,但我已将其归结为这个问题。
(解决方案是将隐式val定义为@transient)
这取决于这些行所在的范围:
让我们来看看三个选项 - 在 方法 中,在 class 中没有 s1
,并在 class 和 s1
:
object TTT {
val sc = new SparkContext("local", "test")
def main(args: Array[String]): Unit = {
new A().foo() // works
new B // works
new C // fails
}
class A {
def foo(): Unit = {
// no problem here: vars in a method can be serialized on their own
implicit val sc2: SparkContext = sc
val s1 = "asdf"
sc.parallelize(Array(1, 2, 3)).map(x1 => s1.map(x => 4)).count()
println("in A - works!")
}
}
class B {
// no problem here: B isn't serialized at all because there are no references to its members
implicit val sc2: SparkContext = sc
sc.parallelize(Array(1, 2, 3)).map(x1 => "asdf".map(x => 4)).count()
println("in B - works!")
}
class C extends Serializable {
implicit val sc2: SparkContext = sc
val s1 = "asdf" // to serialize s1, Spark will try serializing the YYY instance, which will serialize sc2
sc.parallelize(Array(1, 2, 3)).map(x1 => s1.map(x => 4)).count() // fails
}
}
底线——隐含与否,当且仅当 s1
和 sc2
是 class 的成员时,这将失败,这意味着 class 会必须序列化并且 "drag" 它们都将被序列化。
范围是spark-shell REPL。在这种情况下,sc2(以及在顶级 REPL 作用域中定义的任何其他隐式 val)只有在它是隐式的并且在 RDD 操作中使用该作用域的另一个 val 时才会被序列化。这是因为隐式值需要全局可用,因此会自动序列化到所有工作节点。