Spark mapPartitions 与 transient lazy val
Spark mapPartitions vs transient lazy val
我想知道使用 spark mapPartitions
功能与使用 transient lazy val 有什么不同。
由于每个分区基本上 运行 在不同的节点上,因此将在每个节点上创建一个 transient lazy val 实例(假设它在一个对象中)。
例如:
class NotSerializable(v: Int) {
def foo(a: Int) = ???
}
object OnePerPartition {
@transient lazy val obj: NotSerializable = new NotSerializable(10)
}
object Test extends App{
val conf = new SparkConf().setMaster("local[2]").setAppName("test")
val sc = new SparkContext(conf)
val rdd: RDD[Int] = sc.parallelize(1 to 100000)
rdd.map(OnePerPartition.obj.foo)
// ---------- VS ----------
rdd.mapPartitions(itr => {
val obj = new NotSerializable(10)
itr.map(obj.foo)
})
}
有人可能会问你为什么要它...
我想为 运行 我在任何通用集合实现上的逻辑(RDD
、List
、scalding pipe
等)
创建一个通用容器概念
他们都有一个"map"的概念,但是mapPartition
对于spark
是唯一的。
首先这里不需要transient
lazy
。使用 object
包装器足以完成这项工作,您实际上可以将其写为:
object OnePerExecutor {
val obj: NotSerializable = new NotSerializable(10)
}
对象包装器与在 mapPartitions
中初始化 NotSerializable
之间存在根本区别。这个:
rdd.mapPartitions(iter => {
val ns = NotSerializable(1)
???
})
每个分区创建一个 NotSerializable
实例。
另一方面,对象包装器为每个执行程序 JVM 创建一个 NotSerializable
实例。结果这个实例:
- 可用于处理多个分区。
- 可以被多个执行线程同时访问。
- 生命周期超过函数调用的地方。
这意味着它应该是线程安全的,任何方法调用都应该没有副作用。
我想知道使用 spark mapPartitions
功能与使用 transient lazy val 有什么不同。
由于每个分区基本上 运行 在不同的节点上,因此将在每个节点上创建一个 transient lazy val 实例(假设它在一个对象中)。
例如:
class NotSerializable(v: Int) {
def foo(a: Int) = ???
}
object OnePerPartition {
@transient lazy val obj: NotSerializable = new NotSerializable(10)
}
object Test extends App{
val conf = new SparkConf().setMaster("local[2]").setAppName("test")
val sc = new SparkContext(conf)
val rdd: RDD[Int] = sc.parallelize(1 to 100000)
rdd.map(OnePerPartition.obj.foo)
// ---------- VS ----------
rdd.mapPartitions(itr => {
val obj = new NotSerializable(10)
itr.map(obj.foo)
})
}
有人可能会问你为什么要它...
我想为 运行 我在任何通用集合实现上的逻辑(RDD
、List
、scalding pipe
等)
创建一个通用容器概念
他们都有一个"map"的概念,但是mapPartition
对于spark
是唯一的。
首先这里不需要transient
lazy
。使用 object
包装器足以完成这项工作,您实际上可以将其写为:
object OnePerExecutor {
val obj: NotSerializable = new NotSerializable(10)
}
对象包装器与在 mapPartitions
中初始化 NotSerializable
之间存在根本区别。这个:
rdd.mapPartitions(iter => {
val ns = NotSerializable(1)
???
})
每个分区创建一个 NotSerializable
实例。
另一方面,对象包装器为每个执行程序 JVM 创建一个 NotSerializable
实例。结果这个实例:
- 可用于处理多个分区。
- 可以被多个执行线程同时访问。
- 生命周期超过函数调用的地方。
这意味着它应该是线程安全的,任何方法调用都应该没有副作用。