Spark程序性能——GC & Task Deserialization & Concurrent execution

Spark program performance - GC & Task Deserialization & Concurrent execution

我有一个集群,有4台机器,1台master,3台worker,每台128G内存,64核。我在独立模式下使用 Spark 1.5.0。我的程序使用 JDBC 从 Oracle 表中读取数据,然后执行 ETL、操作数据并执行机器学习任务,如 k-means。

我有一个 DataFrame (myDF.cache()),它与其他两个 DataFrame 连接结果并缓存。 DataFrame 包含 2700 万行,数据大小约为 1.5G。我需要过滤数据并计算 24 个直方图,如下所示:

val h1 = myDF.filter("pmod(idx, 24) = 0").select("col1").histogram(arrBucket) 
val h2 = myDF.filter("pmod(idx, 24) = 1").select("col1").histogram(arrBucket) 
// ...... 
val h24 = myDF.filter("pmod(idx, 24) = 23").select("col1").histogram(arrBucket) 

问题:

  1. 由于我的 DataFrame 已缓存,我希望过滤器 select 和直方图非常快。但是每次计算的实际时间约为7秒,这是不能接受的。从 UI,它显示 GC 时间需要 5 秒,任务反序列化时间需要 4 秒。我尝试了不同的 JVM 参数,但无法进一步改进。现在我正在使用

    -Xms25G -Xmx25G -XX:MaxPermSize=512m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 \
    -XX:ParallelGCThreads=32 \
    -XX:ConcGCThreads=8 -XX:InitiatingHeapOccupancyPercent=70 
    

令我困惑的是,与可用内存相比,数据的大小是微不足道的。为什么每次 filter/select/histogram 运行ning 都会启动 GC?有什么办法可以减少 GC 时间和任务反序列化时间吗?

  1. 我必须对 h[1-24] 进行并行计算,而不是顺序计算。我试过 Future,比如:

    import scala.concurrent.{Await, Future, blocking} 
    
    import scala.concurrent.ExecutionContext.Implicits.global 
    
    val f1  = Future{myDF.filter("pmod(idx, 24) = 1").count} 
    val f2  = Future{myDF.filter("pmod(idx, 24) = 2").count} 
    val f3  = Future{myDF.filter("pmod(idx, 24) = 3").count} 
    
    val future = for {c1 <- f1; c2 <- f2; c3 <- f3} yield { 
      c1 + c2 + c3 
    } 
    
    val summ = Await.result(future, 180 second) 
    

问题是这里的 Future 仅意味着作业几乎同时提交给调度程序,而不是它们最终被调度并且 运行 同时。这里使用的 Future 根本不会提高性能。

如何同时进行24个计算作业运行?

您可以尝试几件事:

  1. 不要重新计算 pmod(idx, 24)。相反,您可以简单地计算一次:

    import org.apache.spark.sql.functions.{pmod, lit}
    
    val myDfWithBuckets = myDF.withColumn("bucket", pmod($"idx", lit(24)))
    
  2. 使用 SQLContext.cacheTable 而不是 cache。它使用压缩的列式存储存储 table,可用于仅访问所需的列,如 Spark SQL and DataFrame Guide 中所述“ 将自动调整压缩以最小化内存使用和 GC 压力".

    myDfWithBuckets.registerTempTable("myDfWithBuckets")
    sqlContext.cacheTable("myDfWithBuckets")
    
  3. 如果可以,只缓存你实际需要的列,而不是每次都投影。

  4. 我不清楚 histogram 方法的来源是什么(您是否转换为 RDD[Double] 并使用 DoubleRDDFunctions.histogram?)以及什么是参数,但如果您想同时计算所有直方图,您可以尝试 groupBy 存储桶并应用一次直方图,例如使用 histogram_numeric UDF:

    import org.apache.spark.sql.functions.callUDF
    
    val n: Int = ???
    
    myDfWithBuckets
      .groupBy($"bucket")
      .agg(callUDF("histogram_numeric", $"col1", lit(n)))
    

    如果您使用预定义的范围,您可以使用自定义 UDF 获得类似的效果。

注释

  • 如何提取由histogram_numeric计算的值?首先让我们创建一个小助手

    import org.apache.spark.sql.Row
    
    def extractBuckets(xs: Seq[Row]): Seq[(Double, Double)] =
      xs.map(x => (x.getDouble(0), x.getDouble(1)))
    

    现在我们可以使用模式匹配映射如下:

    import org.apache.spark.rdd.RDD
    
    val histogramsRDD: RDD[(Int, Seq[(Double, Double)])] = histograms.map{
      case Row(k: Int, hs: Seq[Row @unchecked]) => (k, extractBuckets(hs)) }