广播关于 spark 作业的更新
Broadcasting updates on spark jobs
我看到这里问过这个问题,但他们主要关注火花流,我找不到适合批处理的解决方案。这个想法是循环几天,并且在每个 iteration/day 它更新前一天的信息(用于当前迭代)。代码如下所示:
var prevIterDataRdd = // some RDD
days.foreach(folder => {
val previousData : Map[String, Double] = parseResult(prevIterDataRdd)
val broadcastMap = sc.broadcast(previousData)
val (result, previousStatus) =
processFolder(folder, broadcastMap)
// store result
result.write.csv(outputPath)
// updating the RDD that enables me to extract previousData to update broadcast
val passingPrevStatus = prevIterDataRdd.subtractByKey(previousStatus)
prevIterDataRdd = previousStatus.union(passingPrevStatus)
broadcastMap.unpersist(true)
broadcastMap.destroy()
})
使用 broadcastMap.destroy()
不会 运行 因为它不允许我再次使用 broadcastMap(我实际上不明白,因为它应该完全不相关 - 不可变)。
我应该如何 运行 这个循环并在每次迭代时更新广播变量?
当使用方法 unpersist
时,我传递了 true
参数以使其阻塞。 sc.broadcast()
也是阻塞的吗?
我真的需要unpersist()
如果我要再次广播吗?
既然我正在创建一个新的广播变量,为什么我在使用 destroy
后不能再次使用广播?
广播变量不可变,但您可以创建新的广播变量。
这个新的广播变量可以在下一次迭代中使用。
您需要做的就是更改对新创建广播的引用,从执行器中取消保留旧广播并从驱动程序中销毁它。
在class级别定义变量,这将允许您更改驱动程序中广播变量的引用并使用销毁方法。
object Main extends App {
// defined and initialized at class level to allow reference change
var previousData: Map[String, Double] = null
override def main(args: Array[String]): Unit = {
//your code
}
}
不允许您对变量使用 destroy 方法,因为驱动程序中不再存在该引用。更改对新广播变量的引用可以解决问题。
Unpersist 仅从执行器中删除数据,因此当重新访问变量时,驱动程序将其重新发送给执行器。
blocking = true
将允许您让应用程序在下次访问之前从执行器中完全删除数据。
sc.broadcast()
- 没有官方文档说是blocking。虽然一旦它被调用,应用程序就会在代码的下一行 运行 之前开始向执行者广播数据。因此,如果数据非常大,它可能会减慢您的应用程序。因此,请充分注意您的使用方式。
在销毁之前调用 unpersist 是一个好的做法。这将帮助您从执行程序和驱动程序中完全删除数据。
我看到这里问过这个问题,但他们主要关注火花流,我找不到适合批处理的解决方案。这个想法是循环几天,并且在每个 iteration/day 它更新前一天的信息(用于当前迭代)。代码如下所示:
var prevIterDataRdd = // some RDD
days.foreach(folder => {
val previousData : Map[String, Double] = parseResult(prevIterDataRdd)
val broadcastMap = sc.broadcast(previousData)
val (result, previousStatus) =
processFolder(folder, broadcastMap)
// store result
result.write.csv(outputPath)
// updating the RDD that enables me to extract previousData to update broadcast
val passingPrevStatus = prevIterDataRdd.subtractByKey(previousStatus)
prevIterDataRdd = previousStatus.union(passingPrevStatus)
broadcastMap.unpersist(true)
broadcastMap.destroy()
})
使用 broadcastMap.destroy()
不会 运行 因为它不允许我再次使用 broadcastMap(我实际上不明白,因为它应该完全不相关 - 不可变)。
我应该如何 运行 这个循环并在每次迭代时更新广播变量?
当使用方法 unpersist
时,我传递了 true
参数以使其阻塞。 sc.broadcast()
也是阻塞的吗?
我真的需要unpersist()
如果我要再次广播吗?
既然我正在创建一个新的广播变量,为什么我在使用 destroy
后不能再次使用广播?
广播变量不可变,但您可以创建新的广播变量。 这个新的广播变量可以在下一次迭代中使用。
您需要做的就是更改对新创建广播的引用,从执行器中取消保留旧广播并从驱动程序中销毁它。
在class级别定义变量,这将允许您更改驱动程序中广播变量的引用并使用销毁方法。
object Main extends App {
// defined and initialized at class level to allow reference change
var previousData: Map[String, Double] = null
override def main(args: Array[String]): Unit = {
//your code
}
}
不允许您对变量使用 destroy 方法,因为驱动程序中不再存在该引用。更改对新广播变量的引用可以解决问题。
Unpersist 仅从执行器中删除数据,因此当重新访问变量时,驱动程序将其重新发送给执行器。
blocking = true
将允许您让应用程序在下次访问之前从执行器中完全删除数据。
sc.broadcast()
- 没有官方文档说是blocking。虽然一旦它被调用,应用程序就会在代码的下一行 运行 之前开始向执行者广播数据。因此,如果数据非常大,它可能会减慢您的应用程序。因此,请充分注意您的使用方式。
在销毁之前调用 unpersist 是一个好的做法。这将帮助您从执行程序和驱动程序中完全删除数据。