如何拦截驱动程序上累加器的部分更新?
How to intercept partial updates to accumulators on driver?
Spark 1.5.1 + Java1.8
我们正在使用 spark 将大量记录上传到数据库。
操作代码如下所示:
rdd.foreachPartition(new VoidFunction<Iterator<T>>() {
@Override
public void call(Iterator<T> iter) {
//while there are more records perform the following every 1000 records
//int[] recoords = statement.executeBatch();
//accumulator.add(recoords.length);
}
// ...
}
驱动节点上有一个线程监控累加器值。但是该值不会更新。它只会在应用程序结束时更新一次。即使累加器使用惰性值设置,它也应该正确更新,因为我在驱动程序节点线程中定期读取值。
我是不是用错了累加器?无论如何,我可以更持续地监控我的工人的进度吗?
您可以监控累加器值,但不能连续监控,即更新发生在任务完成后。
虽然累加器被称为共享变量,但实际上并没有共享。每个任务都有自己的累加器,在任务完成后合并。这意味着当任务 运行.
时无法更新全局值
为了能够看到更新,执行者的数量必须少于已处理分区的数量(对应于任务的数量)。这样做的原因是在将累加器更新发送到驱动程序时引入 "barrier"。
例如:
import org.apache.spark.{SparkConf, SparkContext}
object App {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[4]")
val sc = new SparkContext(conf)
val accum = sc.accumulator(0, "An Accumulator")
val rdd = sc.parallelize(1 to 1000, 20)
import scala.concurrent.duration._
import scala.language.postfixOps
import rx.lang.scala._
val o = Observable.interval(1000 millis).take(1000)
val s = o.subscribe(_ => println(accum.value))
rdd.foreach(x => {
Thread.sleep(x + 200)
accum += 1
})
s.unsubscribe
sc.stop
}
}
如您所见,每个任务只更新一次全局值。
如果您按照提供的示例创建命名累加器,您也可以使用 Spark UI 监控它的状态。只需打开阶段选项卡,导航到特定阶段并检查累加器部分。
Is there anyway I can more continuously monitor progress from my workers?
最可靠的方法是通过添加更多分区来增加粒度,但这并不便宜。
Spark 1.5.1 + Java1.8
我们正在使用 spark 将大量记录上传到数据库。
操作代码如下所示:
rdd.foreachPartition(new VoidFunction<Iterator<T>>() {
@Override
public void call(Iterator<T> iter) {
//while there are more records perform the following every 1000 records
//int[] recoords = statement.executeBatch();
//accumulator.add(recoords.length);
}
// ...
}
驱动节点上有一个线程监控累加器值。但是该值不会更新。它只会在应用程序结束时更新一次。即使累加器使用惰性值设置,它也应该正确更新,因为我在驱动程序节点线程中定期读取值。
我是不是用错了累加器?无论如何,我可以更持续地监控我的工人的进度吗?
您可以监控累加器值,但不能连续监控,即更新发生在任务完成后。
虽然累加器被称为共享变量,但实际上并没有共享。每个任务都有自己的累加器,在任务完成后合并。这意味着当任务 运行.
时无法更新全局值为了能够看到更新,执行者的数量必须少于已处理分区的数量(对应于任务的数量)。这样做的原因是在将累加器更新发送到驱动程序时引入 "barrier"。
例如:
import org.apache.spark.{SparkConf, SparkContext}
object App {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[4]")
val sc = new SparkContext(conf)
val accum = sc.accumulator(0, "An Accumulator")
val rdd = sc.parallelize(1 to 1000, 20)
import scala.concurrent.duration._
import scala.language.postfixOps
import rx.lang.scala._
val o = Observable.interval(1000 millis).take(1000)
val s = o.subscribe(_ => println(accum.value))
rdd.foreach(x => {
Thread.sleep(x + 200)
accum += 1
})
s.unsubscribe
sc.stop
}
}
如您所见,每个任务只更新一次全局值。
如果您按照提供的示例创建命名累加器,您也可以使用 Spark UI 监控它的状态。只需打开阶段选项卡,导航到特定阶段并检查累加器部分。
Is there anyway I can more continuously monitor progress from my workers?
最可靠的方法是通过添加更多分区来增加粒度,但这并不便宜。