同步 Spark 任务
Synchronize Spark tasks
我的 Spark 进程有一个奇怪的行为,如下所示:
- 读取参考文件,将其加载为
map M
并广播地图
- 读取第二个文件并将其转换为
M
个值
问题是调用M
进行转换是在广播结束之前完成的,然后它没有按预期工作,因为此时M
是空的。
我的问题是:如何让 Spark 在使用对象之前等待对象的加载和广播?
注意:我查看了通话时间,通话确实不同步。那么,M真的加载正确了。
参考文件加载:
object RefObjectUtil extends java.io.Serializable {
var map = Map[String, RefObject]()
def loadAndBroadcast(inputPath: String, sc: SparkContext) = {
val data = read(inputPath, sc).collect
val i = data.iterator
while (i.hasNext) {
val a = i.next
map.put(a.getValue.toString, a)
}
sc.broadcast(map)
}
@throws(classOf[RefObject])
def get(key: String): RefObject= {
isLoaded()
map.getOrElse(key, null)
}
private def isLoaded(): Unit = {
if(map.isEmpty) {
throw new RefObjectException("file has not been loaded. Add it into path argument of your job")
}
}
然后调用:
def run() {
RefObjectUtil.loadAndBroadcast(inputPath, sc)
val data = read(inputToTransformPath, sc)
transform(data) //called without waiting end of refObject loading
}
谢谢。
问题代码不起作用的原因与进程同步无关。该代码可能在驱动程序中正确执行,但 RefObjectUtil
是一个单例对象,它会在每个执行程序中被引用时再次初始化。那时,var map = Map[String, RefObject]()
被初始化并且将不包含任何数据。
看起来在一个程序中有两种 Spark 数据共享技术组合不当。
我们要么使用 'singleton object' 模式来保存每个执行程序的数据,要么我们 assemble 驱动程序中的数据并使用广播将其发送给执行程序。
'singleton object' 模式更适合处理有状态的资源,如数据库连接,这里似乎不是这种情况。广播变量将是更好的选择。
让我们试着用一个例子来说明用法:
受到问题中代码的启发,这段代码加载了一些数据,从中创建了一个地图并广播了它。然后将该地图用于 resolve/translate 更大的数据集。请注意如何使用 broadcastVar.value
.
在闭包中访问广播变量的内容
// get data into map form
val dataMap = val data = read(inputPath, sc).map{elem => (a.getValue.toString, a)}.collectAsMap
// broadcast the data
val broadcastMap = sc.broadcast(dataMap)
val dataToProcessRDD = ???
dataToProcessRDD.flatMap{item =>
broadcastMap.value.get(item) //broadcastMap.value => returns the wrapped map instance
}
我的 Spark 进程有一个奇怪的行为,如下所示:
- 读取参考文件,将其加载为
map M
并广播地图 - 读取第二个文件并将其转换为
M
个值
问题是调用M
进行转换是在广播结束之前完成的,然后它没有按预期工作,因为此时M
是空的。
我的问题是:如何让 Spark 在使用对象之前等待对象的加载和广播?
注意:我查看了通话时间,通话确实不同步。那么,M真的加载正确了。
参考文件加载:
object RefObjectUtil extends java.io.Serializable {
var map = Map[String, RefObject]()
def loadAndBroadcast(inputPath: String, sc: SparkContext) = {
val data = read(inputPath, sc).collect
val i = data.iterator
while (i.hasNext) {
val a = i.next
map.put(a.getValue.toString, a)
}
sc.broadcast(map)
}
@throws(classOf[RefObject])
def get(key: String): RefObject= {
isLoaded()
map.getOrElse(key, null)
}
private def isLoaded(): Unit = {
if(map.isEmpty) {
throw new RefObjectException("file has not been loaded. Add it into path argument of your job")
}
}
然后调用:
def run() {
RefObjectUtil.loadAndBroadcast(inputPath, sc)
val data = read(inputToTransformPath, sc)
transform(data) //called without waiting end of refObject loading
}
谢谢。
问题代码不起作用的原因与进程同步无关。该代码可能在驱动程序中正确执行,但 RefObjectUtil
是一个单例对象,它会在每个执行程序中被引用时再次初始化。那时,var map = Map[String, RefObject]()
被初始化并且将不包含任何数据。
看起来在一个程序中有两种 Spark 数据共享技术组合不当。 我们要么使用 'singleton object' 模式来保存每个执行程序的数据,要么我们 assemble 驱动程序中的数据并使用广播将其发送给执行程序。
'singleton object' 模式更适合处理有状态的资源,如数据库连接,这里似乎不是这种情况。广播变量将是更好的选择。
让我们试着用一个例子来说明用法:
受到问题中代码的启发,这段代码加载了一些数据,从中创建了一个地图并广播了它。然后将该地图用于 resolve/translate 更大的数据集。请注意如何使用 broadcastVar.value
.
// get data into map form
val dataMap = val data = read(inputPath, sc).map{elem => (a.getValue.toString, a)}.collectAsMap
// broadcast the data
val broadcastMap = sc.broadcast(dataMap)
val dataToProcessRDD = ???
dataToProcessRDD.flatMap{item =>
broadcastMap.value.get(item) //broadcastMap.value => returns the wrapped map instance
}