如何拦截驱动程序上累加器的部分更新?

How to intercept partial updates to accumulators on driver?

Spark 1.5.1 + Java1.8

我们正在使用 spark 将大量记录上传到数据库。

操作代码如下所示:

rdd.foreachPartition(new VoidFunction<Iterator<T>>() {

     @Override
     public void call(Iterator<T> iter) {
          //while there are more records perform the following every 1000 records
          //int[] recoords = statement.executeBatch();
          //accumulator.add(recoords.length);
     }
     // ...
} 

驱动节点上有一个线程监控累加器值。但是该值不会更新。它只会在应用程序结束时更新一次。即使累加器使用惰性值设置,它也应该正确更新,因为我在驱动程序节点线程中定期读取值。

我是不是用错了累加器?无论如何,我可以更持续地监控我的工人的进度吗?

您可以监控累加器值,但不能连续监控,即更新发生在任务完成后。

虽然累加器被称为共享变量,但实际上并没有共享。每个任务都有自己的累加器,在任务完成后合并。这意味着当任务 运行.

时无法更新全局值

为了能够看到更新,执行者的数量必须少于已处理分区的数量(对应于任务的数量)。这样做的原因是在将累加器更新发送到驱动程序时引入 "barrier"。

例如:

import org.apache.spark.{SparkConf, SparkContext}

object App {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[4]")
    val sc = new SparkContext(conf)

    val accum = sc.accumulator(0, "An Accumulator")
    val rdd = sc.parallelize(1 to 1000, 20)

    import scala.concurrent.duration._
    import scala.language.postfixOps
    import rx.lang.scala._

    val o = Observable.interval(1000 millis).take(1000)
    val s = o.subscribe(_ => println(accum.value))
    rdd.foreach(x => {
      Thread.sleep(x + 200)
      accum += 1
    })
    s.unsubscribe
    sc.stop
  }
}

如您所见,每个任务只更新一次全局值。

如果您按照提供的示例创建命名累加器,您也可以使用 Spark UI 监控它的状态。只需打开阶段选项卡,导航到特定阶段并检查累加器部分。

Is there anyway I can more continuously monitor progress from my workers?

最可靠的方法是通过添加更多分区来增加粒度,但这并不便宜。