Spark程序性能——GC & Task Deserialization & Concurrent execution
Spark program performance - GC & Task Deserialization & Concurrent execution
我有一个集群,有4台机器,1台master,3台worker,每台128G内存,64核。我在独立模式下使用 Spark 1.5.0。我的程序使用 JDBC 从 Oracle 表中读取数据,然后执行 ETL、操作数据并执行机器学习任务,如 k-means。
我有一个 DataFrame (myDF.cache()),它与其他两个 DataFrame 连接结果并缓存。 DataFrame 包含 2700 万行,数据大小约为 1.5G。我需要过滤数据并计算 24 个直方图,如下所示:
val h1 = myDF.filter("pmod(idx, 24) = 0").select("col1").histogram(arrBucket)
val h2 = myDF.filter("pmod(idx, 24) = 1").select("col1").histogram(arrBucket)
// ......
val h24 = myDF.filter("pmod(idx, 24) = 23").select("col1").histogram(arrBucket)
问题:
由于我的 DataFrame 已缓存,我希望过滤器 select 和直方图非常快。但是每次计算的实际时间约为7秒,这是不能接受的。从 UI,它显示 GC 时间需要 5 秒,任务反序列化时间需要 4 秒。我尝试了不同的 JVM 参数,但无法进一步改进。现在我正在使用
-Xms25G -Xmx25G -XX:MaxPermSize=512m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 \
-XX:ParallelGCThreads=32 \
-XX:ConcGCThreads=8 -XX:InitiatingHeapOccupancyPercent=70
令我困惑的是,与可用内存相比,数据的大小是微不足道的。为什么每次 filter/select/histogram 运行ning 都会启动 GC?有什么办法可以减少 GC 时间和任务反序列化时间吗?
我必须对 h[1-24] 进行并行计算,而不是顺序计算。我试过 Future,比如:
import scala.concurrent.{Await, Future, blocking}
import scala.concurrent.ExecutionContext.Implicits.global
val f1 = Future{myDF.filter("pmod(idx, 24) = 1").count}
val f2 = Future{myDF.filter("pmod(idx, 24) = 2").count}
val f3 = Future{myDF.filter("pmod(idx, 24) = 3").count}
val future = for {c1 <- f1; c2 <- f2; c3 <- f3} yield {
c1 + c2 + c3
}
val summ = Await.result(future, 180 second)
问题是这里的 Future 仅意味着作业几乎同时提交给调度程序,而不是它们最终被调度并且 运行 同时。这里使用的 Future 根本不会提高性能。
如何同时进行24个计算作业运行?
您可以尝试几件事:
不要重新计算 pmod(idx, 24)
。相反,您可以简单地计算一次:
import org.apache.spark.sql.functions.{pmod, lit}
val myDfWithBuckets = myDF.withColumn("bucket", pmod($"idx", lit(24)))
使用 SQLContext.cacheTable
而不是 cache
。它使用压缩的列式存储存储 table,可用于仅访问所需的列,如 Spark SQL and DataFrame Guide 中所述“ 将自动调整压缩以最小化内存使用和 GC 压力".
myDfWithBuckets.registerTempTable("myDfWithBuckets")
sqlContext.cacheTable("myDfWithBuckets")
如果可以,只缓存你实际需要的列,而不是每次都投影。
我不清楚 histogram
方法的来源是什么(您是否转换为 RDD[Double]
并使用 DoubleRDDFunctions.histogram
?)以及什么是参数,但如果您想同时计算所有直方图,您可以尝试 groupBy
存储桶并应用一次直方图,例如使用 histogram_numeric
UDF:
import org.apache.spark.sql.functions.callUDF
val n: Int = ???
myDfWithBuckets
.groupBy($"bucket")
.agg(callUDF("histogram_numeric", $"col1", lit(n)))
如果您使用预定义的范围,您可以使用自定义 UDF 获得类似的效果。
注释
如何提取由histogram_numeric
计算的值?首先让我们创建一个小助手
import org.apache.spark.sql.Row
def extractBuckets(xs: Seq[Row]): Seq[(Double, Double)] =
xs.map(x => (x.getDouble(0), x.getDouble(1)))
现在我们可以使用模式匹配映射如下:
import org.apache.spark.rdd.RDD
val histogramsRDD: RDD[(Int, Seq[(Double, Double)])] = histograms.map{
case Row(k: Int, hs: Seq[Row @unchecked]) => (k, extractBuckets(hs)) }
我有一个集群,有4台机器,1台master,3台worker,每台128G内存,64核。我在独立模式下使用 Spark 1.5.0。我的程序使用 JDBC 从 Oracle 表中读取数据,然后执行 ETL、操作数据并执行机器学习任务,如 k-means。
我有一个 DataFrame (myDF.cache()),它与其他两个 DataFrame 连接结果并缓存。 DataFrame 包含 2700 万行,数据大小约为 1.5G。我需要过滤数据并计算 24 个直方图,如下所示:
val h1 = myDF.filter("pmod(idx, 24) = 0").select("col1").histogram(arrBucket)
val h2 = myDF.filter("pmod(idx, 24) = 1").select("col1").histogram(arrBucket)
// ......
val h24 = myDF.filter("pmod(idx, 24) = 23").select("col1").histogram(arrBucket)
问题:
由于我的 DataFrame 已缓存,我希望过滤器 select 和直方图非常快。但是每次计算的实际时间约为7秒,这是不能接受的。从 UI,它显示 GC 时间需要 5 秒,任务反序列化时间需要 4 秒。我尝试了不同的 JVM 参数,但无法进一步改进。现在我正在使用
-Xms25G -Xmx25G -XX:MaxPermSize=512m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 \ -XX:ParallelGCThreads=32 \ -XX:ConcGCThreads=8 -XX:InitiatingHeapOccupancyPercent=70
令我困惑的是,与可用内存相比,数据的大小是微不足道的。为什么每次 filter/select/histogram 运行ning 都会启动 GC?有什么办法可以减少 GC 时间和任务反序列化时间吗?
我必须对 h[1-24] 进行并行计算,而不是顺序计算。我试过 Future,比如:
import scala.concurrent.{Await, Future, blocking} import scala.concurrent.ExecutionContext.Implicits.global val f1 = Future{myDF.filter("pmod(idx, 24) = 1").count} val f2 = Future{myDF.filter("pmod(idx, 24) = 2").count} val f3 = Future{myDF.filter("pmod(idx, 24) = 3").count} val future = for {c1 <- f1; c2 <- f2; c3 <- f3} yield { c1 + c2 + c3 } val summ = Await.result(future, 180 second)
问题是这里的 Future 仅意味着作业几乎同时提交给调度程序,而不是它们最终被调度并且 运行 同时。这里使用的 Future 根本不会提高性能。
如何同时进行24个计算作业运行?
您可以尝试几件事:
不要重新计算
pmod(idx, 24)
。相反,您可以简单地计算一次:import org.apache.spark.sql.functions.{pmod, lit} val myDfWithBuckets = myDF.withColumn("bucket", pmod($"idx", lit(24)))
使用
SQLContext.cacheTable
而不是cache
。它使用压缩的列式存储存储 table,可用于仅访问所需的列,如 Spark SQL and DataFrame Guide 中所述“ 将自动调整压缩以最小化内存使用和 GC 压力".myDfWithBuckets.registerTempTable("myDfWithBuckets") sqlContext.cacheTable("myDfWithBuckets")
如果可以,只缓存你实际需要的列,而不是每次都投影。
我不清楚
histogram
方法的来源是什么(您是否转换为RDD[Double]
并使用DoubleRDDFunctions.histogram
?)以及什么是参数,但如果您想同时计算所有直方图,您可以尝试groupBy
存储桶并应用一次直方图,例如使用histogram_numeric
UDF:import org.apache.spark.sql.functions.callUDF val n: Int = ??? myDfWithBuckets .groupBy($"bucket") .agg(callUDF("histogram_numeric", $"col1", lit(n)))
如果您使用预定义的范围,您可以使用自定义 UDF 获得类似的效果。
注释
如何提取由
histogram_numeric
计算的值?首先让我们创建一个小助手import org.apache.spark.sql.Row def extractBuckets(xs: Seq[Row]): Seq[(Double, Double)] = xs.map(x => (x.getDouble(0), x.getDouble(1)))
现在我们可以使用模式匹配映射如下:
import org.apache.spark.rdd.RDD val histogramsRDD: RDD[(Int, Seq[(Double, Double)])] = histograms.map{ case Row(k: Int, hs: Seq[Row @unchecked]) => (k, extractBuckets(hs)) }