Spark SQL:为什么一个查询需要两个作业?
Spark SQL: Why two jobs for one query?
实验
我在 Spark 1.6.1
上尝试了以下代码片段。
val soDF = sqlContext.read.parquet("/batchPoC/saleOrder") # This has 45 files
soDF.registerTempTable("so")
sqlContext.sql("select dpHour, count(*) as cnt from so group by dpHour order by cnt").write.parquet("/out/")
Physical Plan
是:
== Physical Plan ==
Sort [cnt#59L ASC], true, 0
+- ConvertToUnsafe
+- Exchange rangepartitioning(cnt#59L ASC,200), None
+- ConvertToSafe
+- TungstenAggregate(key=[dpHour#38], functions=[(count(1),mode=Final,isDistinct=false)], output=[dpHour#38,cnt#59L])
+- TungstenExchange hashpartitioning(dpHour#38,200), None
+- TungstenAggregate(key=[dpHour#38], functions=[(count(1),mode=Partial,isDistinct=false)], output=[dpHour#38,count#63L])
+- Scan ParquetRelation[dpHour#38] InputPaths: hdfs://hdfsNode:8020/batchPoC/saleOrder
对于这个查询,我得到了两个作业:Job 9
和 Job 10
对于 Job 9
,DAG
是:
对于 Job 10
,DAG
是:
观察结果
- 显然,一个查询有两个
jobs
。
Stage-16
(在Job 9
中标记为Stage-14
)在Job 10
中被跳过。
Stage-15
的最后一个 RDD[48]
,与 Stage-17
的最后一个 RDD[49]
相同。 如何?我在日志中看到 Stage-15
执行后, RDD[48]
被注册为 RDD[49]
Stage-17
在 driver-logs
中显示,但从未在 Executors
中执行。在 driver-logs
上显示了任务执行,但是当我查看 Yarn
容器的日志时,没有证据表明从 Stage-17
收到任何 task
。
支持这些观察的日志(只有 driver-logs
,由于后来的崩溃,我丢失了 executor
日志)。可以看到在Stage-17
开始之前,注册了RDD[49]
:
16/06/10 22:11:22 INFO TaskSetManager: Finished task 196.0 in stage 15.0 (TID 1121) in 21 ms on slave-1 (199/200)
16/06/10 22:11:22 INFO TaskSetManager: Finished task 198.0 in stage 15.0 (TID 1123) in 20 ms on slave-1 (200/200)
16/06/10 22:11:22 INFO YarnScheduler: Removed TaskSet 15.0, whose tasks have all completed, from pool
16/06/10 22:11:22 INFO DAGScheduler: ResultStage 15 (parquet at <console>:26) finished in 0.505 s
16/06/10 22:11:22 INFO DAGScheduler: Job 9 finished: parquet at <console>:26, took 5.054011 s
16/06/10 22:11:22 INFO ParquetRelation: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter
16/06/10 22:11:22 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
16/06/10 22:11:22 INFO DefaultWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
16/06/10 22:11:22 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
16/06/10 22:11:22 INFO SparkContext: Starting job: parquet at <console>:26
16/06/10 22:11:22 INFO DAGScheduler: Registering RDD 49 (parquet at <console>:26)
16/06/10 22:11:22 INFO DAGScheduler: Got job 10 (parquet at <console>:26) with 25 output partitions
16/06/10 22:11:22 INFO DAGScheduler: Final stage: ResultStage 18 (parquet at <console>:26)
16/06/10 22:11:22 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 17)
16/06/10 22:11:22 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 17)
16/06/10 22:11:22 INFO DAGScheduler: Submitting ShuffleMapStage 17 (MapPartitionsRDD[49] at parquet at <console>:26), which has no missing parents
16/06/10 22:11:22 INFO MemoryStore: Block broadcast_25 stored as values in memory (estimated size 17.4 KB, free 512.3 KB)
16/06/10 22:11:22 INFO MemoryStore: Block broadcast_25_piece0 stored as bytes in memory (estimated size 8.9 KB, free 521.2 KB)
16/06/10 22:11:22 INFO BlockManagerInfo: Added broadcast_25_piece0 in memory on 172.16.20.57:44944 (size: 8.9 KB, free: 517.3 MB)
16/06/10 22:11:22 INFO SparkContext: Created broadcast 25 from broadcast at DAGScheduler.scala:1006
16/06/10 22:11:22 INFO DAGScheduler: Submitting 200 missing tasks from ShuffleMapStage 17 (MapPartitionsRDD[49] at parquet at <console>:26)
16/06/10 22:11:22 INFO YarnScheduler: Adding task set 17.0 with 200 tasks
16/06/10 22:11:23 INFO TaskSetManager: Starting task 0.0 in stage 17.0 (TID 1125, slave-1, partition 0,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 1.0 in stage 17.0 (TID 1126, slave-2, partition 1,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 2.0 in stage 17.0 (TID 1127, slave-1, partition 2,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 3.0 in stage 17.0 (TID 1128, slave-2, partition 3,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 4.0 in stage 17.0 (TID 1129, slave-1, partition 4,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 5.0 in stage 17.0 (TID 1130, slave-2, partition 5,NODE_LOCAL, 1988 bytes)
问题
- 为什么是两个
Jobs
?把一个 DAG
分成两个 jobs
是什么意思?
Job 10
的 DAG
看起来 完成 查询执行。有什么具体的 Job 9
在做什么吗?
- 为什么
Stage-17
没有被跳过?看起来像创建了虚拟 tasks
,它们有什么用途吗?
后来,我尝试了另一个更简单的查询。出乎意料的是,它正在创建 3 Jobs
。
sqlContext.sql("select dpHour from so order by dphour").write.parquet("/out2/")
当您使用高级 dataframe/dataset API 时,由 Spark 决定执行计划,包括 job/stage 分块。这些取决于许多因素,例如执行并行性、cached/persisted 数据结构等。在 Spark 的未来版本中,随着优化器复杂性的提高,您可能会看到每个查询的作业更多,例如,对某些数据源进行采样参数化基于成本的执行优化。
例如,我经常(但并非总是)看到写作从涉及洗牌的处理中生成单独的作业。
最重要的是,如果您使用的是高级 API,除非您必须对巨大的数据量进行非常详细的优化,否则深入研究特定的分块很少是值得的。与 processing/output 相比,工作启动成本极低。
另一方面,如果您对 Spark 内部机制感到好奇,请阅读优化器代码并加入 Spark 开发人员邮件列表。
实验
我在 Spark 1.6.1
上尝试了以下代码片段。
val soDF = sqlContext.read.parquet("/batchPoC/saleOrder") # This has 45 files
soDF.registerTempTable("so")
sqlContext.sql("select dpHour, count(*) as cnt from so group by dpHour order by cnt").write.parquet("/out/")
Physical Plan
是:
== Physical Plan ==
Sort [cnt#59L ASC], true, 0
+- ConvertToUnsafe
+- Exchange rangepartitioning(cnt#59L ASC,200), None
+- ConvertToSafe
+- TungstenAggregate(key=[dpHour#38], functions=[(count(1),mode=Final,isDistinct=false)], output=[dpHour#38,cnt#59L])
+- TungstenExchange hashpartitioning(dpHour#38,200), None
+- TungstenAggregate(key=[dpHour#38], functions=[(count(1),mode=Partial,isDistinct=false)], output=[dpHour#38,count#63L])
+- Scan ParquetRelation[dpHour#38] InputPaths: hdfs://hdfsNode:8020/batchPoC/saleOrder
对于这个查询,我得到了两个作业:Job 9
和 Job 10
对于 Job 9
,DAG
是:
对于 Job 10
,DAG
是:
观察结果
- 显然,一个查询有两个
jobs
。 Stage-16
(在Job 9
中标记为Stage-14
)在Job 10
中被跳过。Stage-15
的最后一个RDD[48]
,与Stage-17
的最后一个RDD[49]
相同。 如何?我在日志中看到Stage-15
执行后,RDD[48]
被注册为RDD[49]
Stage-17
在driver-logs
中显示,但从未在Executors
中执行。在driver-logs
上显示了任务执行,但是当我查看Yarn
容器的日志时,没有证据表明从Stage-17
收到任何task
。
支持这些观察的日志(只有 driver-logs
,由于后来的崩溃,我丢失了 executor
日志)。可以看到在Stage-17
开始之前,注册了RDD[49]
:
16/06/10 22:11:22 INFO TaskSetManager: Finished task 196.0 in stage 15.0 (TID 1121) in 21 ms on slave-1 (199/200)
16/06/10 22:11:22 INFO TaskSetManager: Finished task 198.0 in stage 15.0 (TID 1123) in 20 ms on slave-1 (200/200)
16/06/10 22:11:22 INFO YarnScheduler: Removed TaskSet 15.0, whose tasks have all completed, from pool
16/06/10 22:11:22 INFO DAGScheduler: ResultStage 15 (parquet at <console>:26) finished in 0.505 s
16/06/10 22:11:22 INFO DAGScheduler: Job 9 finished: parquet at <console>:26, took 5.054011 s
16/06/10 22:11:22 INFO ParquetRelation: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter
16/06/10 22:11:22 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
16/06/10 22:11:22 INFO DefaultWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
16/06/10 22:11:22 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
16/06/10 22:11:22 INFO SparkContext: Starting job: parquet at <console>:26
16/06/10 22:11:22 INFO DAGScheduler: Registering RDD 49 (parquet at <console>:26)
16/06/10 22:11:22 INFO DAGScheduler: Got job 10 (parquet at <console>:26) with 25 output partitions
16/06/10 22:11:22 INFO DAGScheduler: Final stage: ResultStage 18 (parquet at <console>:26)
16/06/10 22:11:22 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 17)
16/06/10 22:11:22 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 17)
16/06/10 22:11:22 INFO DAGScheduler: Submitting ShuffleMapStage 17 (MapPartitionsRDD[49] at parquet at <console>:26), which has no missing parents
16/06/10 22:11:22 INFO MemoryStore: Block broadcast_25 stored as values in memory (estimated size 17.4 KB, free 512.3 KB)
16/06/10 22:11:22 INFO MemoryStore: Block broadcast_25_piece0 stored as bytes in memory (estimated size 8.9 KB, free 521.2 KB)
16/06/10 22:11:22 INFO BlockManagerInfo: Added broadcast_25_piece0 in memory on 172.16.20.57:44944 (size: 8.9 KB, free: 517.3 MB)
16/06/10 22:11:22 INFO SparkContext: Created broadcast 25 from broadcast at DAGScheduler.scala:1006
16/06/10 22:11:22 INFO DAGScheduler: Submitting 200 missing tasks from ShuffleMapStage 17 (MapPartitionsRDD[49] at parquet at <console>:26)
16/06/10 22:11:22 INFO YarnScheduler: Adding task set 17.0 with 200 tasks
16/06/10 22:11:23 INFO TaskSetManager: Starting task 0.0 in stage 17.0 (TID 1125, slave-1, partition 0,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 1.0 in stage 17.0 (TID 1126, slave-2, partition 1,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 2.0 in stage 17.0 (TID 1127, slave-1, partition 2,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 3.0 in stage 17.0 (TID 1128, slave-2, partition 3,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 4.0 in stage 17.0 (TID 1129, slave-1, partition 4,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 5.0 in stage 17.0 (TID 1130, slave-2, partition 5,NODE_LOCAL, 1988 bytes)
问题
- 为什么是两个
Jobs
?把一个DAG
分成两个jobs
是什么意思? Job 10
的DAG
看起来 完成 查询执行。有什么具体的Job 9
在做什么吗?- 为什么
Stage-17
没有被跳过?看起来像创建了虚拟tasks
,它们有什么用途吗? 后来,我尝试了另一个更简单的查询。出乎意料的是,它正在创建 3
Jobs
。sqlContext.sql("select dpHour from so order by dphour").write.parquet("/out2/")
当您使用高级 dataframe/dataset API 时,由 Spark 决定执行计划,包括 job/stage 分块。这些取决于许多因素,例如执行并行性、cached/persisted 数据结构等。在 Spark 的未来版本中,随着优化器复杂性的提高,您可能会看到每个查询的作业更多,例如,对某些数据源进行采样参数化基于成本的执行优化。
例如,我经常(但并非总是)看到写作从涉及洗牌的处理中生成单独的作业。
最重要的是,如果您使用的是高级 API,除非您必须对巨大的数据量进行非常详细的优化,否则深入研究特定的分块很少是值得的。与 processing/output 相比,工作启动成本极低。
另一方面,如果您对 Spark 内部机制感到好奇,请阅读优化器代码并加入 Spark 开发人员邮件列表。