Spark mapPartitions 与 transient lazy val

Spark mapPartitions vs transient lazy val

我想知道使用 spark mapPartitions 功能与使用 transient lazy val 有什么不同。
由于每个分区基本上 运行 在不同的节点上,因此将在每个节点上创建一个 transient lazy val 实例(假设它在一个对象中)。

例如:

class NotSerializable(v: Int) {
  def foo(a: Int) = ???
}

object OnePerPartition {
  @transient lazy val obj: NotSerializable = new NotSerializable(10)
}

object Test extends App{
    val conf = new SparkConf().setMaster("local[2]").setAppName("test")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(1 to 100000)

    rdd.map(OnePerPartition.obj.foo)

    // ---------- VS ----------

    rdd.mapPartitions(itr => {
      val obj = new NotSerializable(10)
      itr.map(obj.foo)
    })
}

有人可能会问你为什么要它...
我想为 运行 我在任何通用集合实现上的逻辑(RDDListscalding pipe 等)
创建一个通用容器概念 他们都有一个"map"的概念,但是mapPartition对于spark是唯一的。

首先这里不需要transientlazy。使用 object 包装器足以完成这项工作,您实际上可以将其写为:

object OnePerExecutor {
  val obj: NotSerializable = new NotSerializable(10)
}

对象包装器与在 mapPartitions 中初始化 NotSerializable 之间存在根本区别。这个:

rdd.mapPartitions(iter => {
  val ns = NotSerializable(1)
  ???
})

每个分区创建一个 NotSerializable 实例。

另一方面,对象包装器为每个执行程序 JVM 创建一个 NotSerializable 实例。结果这个实例:

  • 可用于处理多个分区。
  • 可以被多个执行线程同时访问。
  • 生命周期超过函数调用的地方。

这意味着它应该是线程安全的,任何方法调用都应该没有副作用。