有没有办法优化 spark sql 代码?
is there a way to optimize spark sql code?
更新:
我正在使用 spark sql 1.5.2。尝试读取许多 parquet 文件并过滤和聚合行 - 我的 hdfs 中的 ~30 个文件中存储了 ~35M 行,处理时间超过 10 分钟
val logins_12 = sqlContext.read.parquet("events/2015/12/*/login")
val l_12 = logins_12.where("event_data.level >= 90").select(
"pid",
"timestamp",
"event_data.level"
).withColumn("event_date", to_date(logins_12("timestamp"))).drop("timestamp").toDF("pid", "level", "event_date").groupBy("pid", "event_date").agg(Map("level"->"max")).toDF("pid", "event_date", "level")
l_12.first()
我的 spark 运行 在两个节点集群中,每个集群有 8 个核心和 16Gb 内存,scala 输出让我认为计算只在一个线程中运行:
scala> x.first()
[Stage 1:=======> (50 + 1) / 368]
当我尝试使用 count() 而不是 first() 时,看起来好像有两个线程在进行计算。这仍然比我预期的要少,因为有大约 30 个文件可以并行处理
scala> l_12.count()
[Stage 4:=====> (34 + 2) / 368]
我在 yarn-client 模式下启动 spark 控制台,14g 用于执行程序,4g 用于驱动程序
./bin/spark-shell -Dspark.executor.memory=14g -Dspark.driver.memory=4g --master yarn-client
我的 spark 默认配置:
spark.executor.memory 2g
spark.logConf true
spark.eventLog.dir maprfs:///apps/spark
spark.eventLog.enabled true
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc,com.mapr.fs.shim.LibraryLoader,com.mapr.security.JNISecurity,com.mapr.fs.jni
spark.executor.extraClassPath
spark.yarn.historyServer.address http://test-01:18080
rdd有200个分区
scala> logins_12.rdd.partitions.size
res2: Int = 368
scala> l_12.rdd.partitions.size
res0: Int = 200
有没有办法优化这段代码?
谢谢
这两种行为或多或少都在意料之中。 Spark 相当懒惰,它不仅不会执行转换,除非你触发一个动作,而且如果不需要输出,它也可以跳过任务。由于 first
只需要一个元素,因此它只能计算一个分区。这很可能是您有时只看到一个 运行 线程的原因。
关于第二个问题,很可能是配置问题。假设 YARN 配置没有问题(我不使用 YARN,但 yarn.nodemanager.resource.cpu-vcores
看起来可能是问题的根源),这很可能是 Spark 默认设置的问题。正如您在 Configuration guide 中所读,Yarn 上的 spark.executor.cores
默认设置为 1。两个工作人员提供两个 运行 线程。
更新:
我正在使用 spark sql 1.5.2。尝试读取许多 parquet 文件并过滤和聚合行 - 我的 hdfs 中的 ~30 个文件中存储了 ~35M 行,处理时间超过 10 分钟
val logins_12 = sqlContext.read.parquet("events/2015/12/*/login")
val l_12 = logins_12.where("event_data.level >= 90").select(
"pid",
"timestamp",
"event_data.level"
).withColumn("event_date", to_date(logins_12("timestamp"))).drop("timestamp").toDF("pid", "level", "event_date").groupBy("pid", "event_date").agg(Map("level"->"max")).toDF("pid", "event_date", "level")
l_12.first()
我的 spark 运行 在两个节点集群中,每个集群有 8 个核心和 16Gb 内存,scala 输出让我认为计算只在一个线程中运行:
scala> x.first()
[Stage 1:=======> (50 + 1) / 368]
当我尝试使用 count() 而不是 first() 时,看起来好像有两个线程在进行计算。这仍然比我预期的要少,因为有大约 30 个文件可以并行处理
scala> l_12.count()
[Stage 4:=====> (34 + 2) / 368]
我在 yarn-client 模式下启动 spark 控制台,14g 用于执行程序,4g 用于驱动程序
./bin/spark-shell -Dspark.executor.memory=14g -Dspark.driver.memory=4g --master yarn-client
我的 spark 默认配置:
spark.executor.memory 2g
spark.logConf true
spark.eventLog.dir maprfs:///apps/spark
spark.eventLog.enabled true
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc,com.mapr.fs.shim.LibraryLoader,com.mapr.security.JNISecurity,com.mapr.fs.jni
spark.executor.extraClassPath
spark.yarn.historyServer.address http://test-01:18080
rdd有200个分区
scala> logins_12.rdd.partitions.size
res2: Int = 368
scala> l_12.rdd.partitions.size
res0: Int = 200
有没有办法优化这段代码? 谢谢
这两种行为或多或少都在意料之中。 Spark 相当懒惰,它不仅不会执行转换,除非你触发一个动作,而且如果不需要输出,它也可以跳过任务。由于 first
只需要一个元素,因此它只能计算一个分区。这很可能是您有时只看到一个 运行 线程的原因。
关于第二个问题,很可能是配置问题。假设 YARN 配置没有问题(我不使用 YARN,但 yarn.nodemanager.resource.cpu-vcores
看起来可能是问题的根源),这很可能是 Spark 默认设置的问题。正如您在 Configuration guide 中所读,Yarn 上的 spark.executor.cores
默认设置为 1。两个工作人员提供两个 运行 线程。