同步 Spark 任务

Synchronize Spark tasks

我的 Spark 进程有一个奇怪的行为,如下所示:

  1. 读取参考文件,将其加载为 map M 并广播地图
  2. 读取第二个文件并将其转换为 M 个值

问题是调用M进行转换是在广播结束之前完成的,然后它没有按预期工作,因为此时M是空的。

我的问题是:如何让 Spark 在使用对象之前等待对象的加载和广播?

注意:我查看了通话时间,通话确实不同步。那么,M真的加载正确了。

参考文件加载:

  object RefObjectUtil extends java.io.Serializable {

  var map = Map[String, RefObject]()

  def loadAndBroadcast(inputPath: String, sc: SparkContext) = {
    val data = read(inputPath, sc).collect

    val i = data.iterator
    while (i.hasNext) {
      val a = i.next
      map.put(a.getValue.toString, a)
    }
    sc.broadcast(map)
  }

  @throws(classOf[RefObject])
  def get(key: String): RefObject= {
    isLoaded()
    map.getOrElse(key, null)
  }


  private def isLoaded(): Unit = {
    if(map.isEmpty) {
      throw new RefObjectException("file has not been loaded. Add it into path argument of your job")
    }
  }

然后调用:

 def run() {
       RefObjectUtil.loadAndBroadcast(inputPath, sc)
       val data = read(inputToTransformPath, sc)
       transform(data)  //called without waiting end of refObject loading 
    }

谢谢。

问题代码不起作用的原因与进程同步无关。该代码可能在驱动程序中正确执行,但 RefObjectUtil 是一个单例对象,它会在每个执行程序中被引用时再次初始化。那时,var map = Map[String, RefObject]() 被初始化并且将不包含任何数据。

看起来在一个程序中有两种 Spark 数据共享技术组合不当。 我们要么使用 'singleton object' 模式来保存每个执行程序的数据,要么我们 assemble 驱动程序中的数据并使用广播将其发送给执行程序。

'singleton object' 模式更适合处理有状态的资源,如数据库连接,这里似乎不是这种情况。广播变量将是更好的选择。

让我们试着用一个例子来说明用法:

受到问题中代码的启发,这段代码加载了一些数据,从中创建了一个地图并广播了它。然后将该地图用于 resolve/translate 更大的数据集。请注意如何使用 broadcastVar.value.

在闭包中访问广播变量的内容
// get data into map form
val dataMap =  val data = read(inputPath, sc).map{elem => (a.getValue.toString, a)}.collectAsMap
// broadcast the data
val broadcastMap = sc.broadcast(dataMap)

val dataToProcessRDD = ???
dataToProcessRDD.flatMap{item =>
    broadcastMap.value.get(item) //broadcastMap.value => returns the wrapped map instance
}