spark 2.0 parallel JobProgressListener 惨败

spark 2.0 parallel JobProgressListener fails miserably

我有一个场景,我需要使用 for 循环并行触发许多 sql 查询,并将结果列表收集到 ListBuffer 中。 但是,我在循环中遇到很多错误 运行 并且结果不完整。为了举例说明,我做了一个非常简单的可重现示例:

import scala.collection.mutable.ListBuffer
val dummy = List("a","b").toDF.createOrReplaceTempView("df")
spark.catalog.cacheTable("df")
val dig = (0 to 9).par
var counter = 0:Int
var results = ListBuffer[List[org.apache.spark.sql.Row]]()

for (i1 <- dig ) {
   for (i2 <- dig ) {
     for (i3 <- dig ) {
        println("||==="+i1+"=="+i2+"=="+i3+"===="+(i1*100+i2*10+i3*1)+"===="+counter+"=======||")
        counter +=1
        results += spark.sql("select 'trial','"+i1+"','"+i2+"','"+i3+"','"+(i1*100+i2*10+i3*1)+"','"+counter+"',*  from df ").collect().toList
       }
     }
   }
results(0).take(2).foreach(println)
results.size
results.flatten.size

上面的代码只是从 0 到 999 进行计数,每次计数都会将 2 行的列表插入到 ListBuffer 中。 table 以及用于比较的 'serial' 计数器值

运行将代码结果与:

||===9==8==3====983====969=======||
||===9==8==5====985====969=======||
||===9==8==1====981====969=======||
||===9==8==2====982====969=======||
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 784
||===9==8==7====987====974=======||
||===5==8==9====589====975=======||
||===9==8==4====984====976=======||
||===9==8==6====986====976=======||
||===9==8==9====989====977=======||
||===9==8==8====988====977=======||
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 773
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 790
||===5==9==0====590====980=======||
||===5==9==2====592====980=======||
||===5==9==5====595====980=======||
||===5==9==1====591====980=======||
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 795
||===5==9==3====593====984=======||
||===5==9==7====597====985=======||
||===5==9==8====598====985=======||
||===5==9==6====596====987=======||
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 798
||===5==9==9====599====988=======||
||===5==9==4====594====989=======||
||===9==9==0====990====990=======||
||===9==9==5====995====991=======||
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 784
||===9==9==2====992====992=======||
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 789
||===9==9==3====993====993=======||
||===9==9==1====991====994=======||
||===9==9==4====994====995=======||
||===9==9==7====997====996=======||
||===9==9==8====998====997=======||
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 790
||===9==9==6====996====998=======||
||===9==9==9====999====999=======||
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 805
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 798

scala> results(0).take(2).foreach(println)
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 802
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 805
[trial,0,0,0,0,16,a]
[trial,0,0,0,0,16,b]

scala> results.size
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 839
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 840
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 839
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 842
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 855
res3: Int = 1000

scala> results.flatten.size
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 860
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 854
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 860
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 868
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 874
res4: Int = 2000
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 882

scala> 

[Stage 589:=(28 + 0) / 28][Stage 590:>(27 + 1) / 28][Stage 591:>(20 + 7) / 28]16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 888
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 895
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 898
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 898
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 905
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 906
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 907
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 902
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 905
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 913
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 915
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 916
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 913
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 920
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 942
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 946
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 942
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 946
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 948
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 956
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 952
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 965
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 965
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 966
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 976
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 976
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 990
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 999


scala> 

这些只是我收到的一些 WARN。

你可以看到计数器'staggers'有时

**这就是麻烦开始的地方**

很多警告,但 results.size=1000results.flatten.size = 2000 符合预期。

然而,尝试以相同的方式数到 10000 会导致更多警告:

import scala.collection.mutable.ListBuffer
val dummy = List("a","b").toDF.createOrReplaceTempView("df")
spark.catalog.cacheTable("df")
val dig = (0 to 9).par
var counter = 0:Int
var results = ListBuffer[List[org.apache.spark.sql.Row]]()

for (i1 <- dig ) {
   for (i2 <- dig ) {
     for (i3 <- dig ) {
       for (i4 <- dig ) {
         println("||==="+i1+"=="+i2+"=="+i3+"=="+i4+"===="+(i1*1000+i2*100+i3*10+i4*1)+"===="+counter+"=======||")
         counter +=1
         results += spark.sql("select 'trial','"+i1+"','"+i2+"','"+i3+"', '"+i4+"','"+(i1*1000+i2*100+i3*10+i4*1)+"','"+counter+"',*  from df ").collect().toList
       }
     }
   }
 }
results(0).take(2).foreach(println)
results.size
results.flatten.size

输出:

16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8797
||===0==9==4==3====943====9998=======||
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8799
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8801
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8802
||===0==9==4==4====944====9999=======||
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8803
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8804
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8805
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8806

结果:

scala> results(0).take(2).foreach(println)
[trial,3,0,0,0,3000,7,a]
[trial,3,0,0,0,3000,7,b]

scala> results.size
res3: Int = 9999

scala> results.flatten.size
res4: Int = 19998

缺少一个值。

我邀请你试试下面的数到 100000 的代码:

import scala.collection.mutable.ListBuffer
val dummy = List("a","b").toDF.createOrReplaceTempView("df")
spark.catalog.cacheTable("df")
val dig = (0 to 9).par
var counter = 0:Int
var results = ListBuffer[List[org.apache.spark.sql.Row]]()

for (i0 <- dig ) {
  for (i1 <- dig ) {
    for (i2 <- dig ) {
      for (i3 <- dig ) {
        for (i4 <- dig ) {
          println("============="+i0+"=="+i1+"=="+i2+"=="+i3+"=="+i4+"===="+(i0*10000+i1*1000+i2*100+i3*10+i4*1)+"===="+counter+"=========") 
          counter +=1
          results += spark.sql("select 'trial','"+i0+"','"+i1+"','"+i2+"','"+i3+"', '"+i4+"','"+(i0*10000+i1*1000+i2*100+i3*10+i4*1)+"','"+counter+"',*  from df ").collect().toList
        }
      }
    }
  }
}

不仅我在 运行 期间收到大量的 JobProgressListener 警告,而且结果不完整且不确定:

scala> results(0).take(2).foreach(println)
[trial,8,5,0,0,0,85000,13,a]
[trial,8,5,0,0,0,85000,13,b]

scala> results.size
res3: Int = 99999

scala> results.flatten.size
res4: Int = 192908

在我的真实生活示例中,我经常在 运行

的随机点出现“spark.sql.execution.id 已设置”异常

我该如何解决这个问题?

我试过了

spark.conf.set("spark.extraListeners","org.apache.spark.scheduler.StatsReportListener,org.apache.spark.scheduler.EventLoggingListener")

并阅读Spark 1.6: java.lang.IllegalArgumentException: spark.sql.execution.id is already set

Apache Spark: network errors between executors

http://docs.scala-lang.org/overviews/parallel-collections/overview.html关于副作用的操作,但似乎方向太多了。

恕我直言,与此问题最相关的错误是 https://issues.apache.org/jira/browse/SPARK-10548 应该在 spark 1.6

中解决

任何人都可以提供一些解决这种情况的提示吗?我的真实案例的复杂性类似于 100000 计数,并且在随机阶段执行失败。

我部署了一个 GCS dataproc 集群

gcloud dataproc clusters create clusTest --zone us-central1-b --master-machine-type n1-highmem-16 --num-workers 2 --worker-machine-type n1-highmem-8 --num-worker-local-ssds 2 --num-preemptible-workers 8 --scopes 'https://www.googleapis.com/auth/cloud-platform' --project xyz-analytics

the results are incomplete and non-deterministic

非确定性部分应该给出提示。在将结果添加到 ListBuffer 中时,您进入了竞争状态(并行更新并不是真正的线程安全,因此如果 运行 时间足够长,您最终会丢失一些结果。)

我在本地试了一下,可以重现这个结果不完整的问题。只需添加一个用于附加到 Buffer 的同步块即可使结果完整。您还可以为您的工作使用其他 synchronized 数据结构,因此您无需放置明确的 synchronized 块,例如java.util.concurrent.ConcurrentLinkedQueue 什么的。

所以下面解决了这个问题:

for (i1 <- dig ) {
   for (i2 <- dig ) {
     for (i3 <- dig ) {
       for (i4 <- dig ) {
         counter +=1
         val result = spark.sql("select 'trial','"+i1+"','"+i2+"','"+i3+"', '"+i4+"','"+(i1*1000+i2*100+i3*10+i4*1)+"','"+counter+"',*  from df ").collect().toList
         synchronized {
           results += result
         }
       }
     }
   }
 }

至于“spark.sql.execution.id is already set”异常:我无法用上面给出的例子重现它。 (但是,我 运行 在本地 Spark 上使用上面的代码。)它可以在本地设置中重现吗?