为什么 sortBy 转换会触发 Spark 作业?
Why does sortBy transformation trigger a Spark job?
根据 Spark 文档,只有 RDD 操作可以触发 Spark 作业,并且在调用操作时会延迟评估转换。
我看到立即应用了 sortBy
转换函数,它在 SparkUI 中显示为作业触发器。为什么?
sortBy
是使用 sortByKey
实现的,它依赖于 RangePartitioner
(JVM) 或分区函数 (Python)。当您调用 sortBy
/ sortByKey
时,分区程序(分区函数)会急切地初始化并采样输入 RDD 以计算分区边界。你看到的job对应的就是这个流程。
仅当您对新创建的 RDD
或其后代执行操作时才会执行实际排序。
As per Spark documentation only the action triggers a job in Spark, the transformations are lazily evaluated when an action is called on it.
总的来说你是对的,但正如你刚刚经历的那样,很少有例外,sortBy
就是其中之一(zipWithIndex
)。
事实上,它已在 Spark 的 JIRA 中报告并以 Won't Fix 解决方案关闭。参见 SPARK-1021 sortByKey() launches a cluster job when it shouldn't。
您可以看到作业 运行 DAGScheduler
已启用日志记录(以及稍后在 Web UI 中):
scala> sc.parallelize(0 to 8).sortBy(identity)
INFO DAGScheduler: Got job 1 (sortBy at <console>:25) with 8 output partitions
INFO DAGScheduler: Final stage: ResultStage 1 (sortBy at <console>:25)
INFO DAGScheduler: Parents of final stage: List()
INFO DAGScheduler: Missing parents: List()
DEBUG DAGScheduler: submitStage(ResultStage 1)
DEBUG DAGScheduler: missing: List()
INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[4] at sortBy at <console>:25), which has no missing parents
DEBUG DAGScheduler: submitMissingTasks(ResultStage 1)
INFO DAGScheduler: Submitting 8 missing tasks from ResultStage 1 (MapPartitionsRDD[4] at sortBy at <console>:25)
DEBUG DAGScheduler: New pending partitions: Set(0, 1, 5, 2, 6, 3, 7, 4)
INFO DAGScheduler: ResultStage 1 (sortBy at <console>:25) finished in 0.013 s
DEBUG DAGScheduler: After removal of stage 1, remaining stages = 0
INFO DAGScheduler: Job 1 finished: sortBy at <console>:25, took 0.019755 s
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at sortBy at <console>:25
根据 Spark 文档,只有 RDD 操作可以触发 Spark 作业,并且在调用操作时会延迟评估转换。
我看到立即应用了 sortBy
转换函数,它在 SparkUI 中显示为作业触发器。为什么?
sortBy
是使用 sortByKey
实现的,它依赖于 RangePartitioner
(JVM) 或分区函数 (Python)。当您调用 sortBy
/ sortByKey
时,分区程序(分区函数)会急切地初始化并采样输入 RDD 以计算分区边界。你看到的job对应的就是这个流程。
仅当您对新创建的 RDD
或其后代执行操作时才会执行实际排序。
As per Spark documentation only the action triggers a job in Spark, the transformations are lazily evaluated when an action is called on it.
总的来说你是对的,但正如你刚刚经历的那样,很少有例外,sortBy
就是其中之一(zipWithIndex
)。
事实上,它已在 Spark 的 JIRA 中报告并以 Won't Fix 解决方案关闭。参见 SPARK-1021 sortByKey() launches a cluster job when it shouldn't。
您可以看到作业 运行 DAGScheduler
已启用日志记录(以及稍后在 Web UI 中):
scala> sc.parallelize(0 to 8).sortBy(identity)
INFO DAGScheduler: Got job 1 (sortBy at <console>:25) with 8 output partitions
INFO DAGScheduler: Final stage: ResultStage 1 (sortBy at <console>:25)
INFO DAGScheduler: Parents of final stage: List()
INFO DAGScheduler: Missing parents: List()
DEBUG DAGScheduler: submitStage(ResultStage 1)
DEBUG DAGScheduler: missing: List()
INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[4] at sortBy at <console>:25), which has no missing parents
DEBUG DAGScheduler: submitMissingTasks(ResultStage 1)
INFO DAGScheduler: Submitting 8 missing tasks from ResultStage 1 (MapPartitionsRDD[4] at sortBy at <console>:25)
DEBUG DAGScheduler: New pending partitions: Set(0, 1, 5, 2, 6, 3, 7, 4)
INFO DAGScheduler: ResultStage 1 (sortBy at <console>:25) finished in 0.013 s
DEBUG DAGScheduler: After removal of stage 1, remaining stages = 0
INFO DAGScheduler: Job 1 finished: sortBy at <console>:25, took 0.019755 s
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at sortBy at <console>:25