DAG 和 Spark 执行
DAG and Spark execution
我试图更好地了解 Spark 的内部结构,但我不确定如何解释作业的结果 DAG。
灵感来自 http://dev.sortable.com/spark-repartition/ 中描述的示例,
我运行下面的代码在Sparkshell中获取了2到200万个素数的列表。
val n = 2000000
val composite = sc.parallelize(2 to n, 8).map(x => (x, (2 to (n / x)))).flatMap(kv => kv._2.map(_ * kv._1))
val prime = sc.parallelize(2 to n, 8).subtract(composite)
prime.collect()
执行后查看了SparkUI,观察了图中的DAG
现在我的问题是:我只调用了一次subtract函数,为什么会出现这个操作
DAG 中的 3 次?
另外,是否有任何教程可以稍微解释一下 Spark 如何创建这些 DAG?
提前致谢。
subtract
是一个需要洗牌的转换:
- 首先,
RDDs
必须使用相同的分区器本地 ("map-side") 重新分区,转换的部分在阶段 0 和阶段 1 中标记为 subtract
。此时两个 RDD 都转换为 (item, null)
对。
substract
你在第 2 阶段看到的发生在洗牌之后,此时 RDD 已经合并。这是过滤项目的地方。
一般来说,任何需要洗牌的操作都会至少分两个阶段执行(取决于先行者的数量),属于每个阶段的任务将单独显示。
我试图更好地了解 Spark 的内部结构,但我不确定如何解释作业的结果 DAG。
灵感来自 http://dev.sortable.com/spark-repartition/ 中描述的示例,
我运行下面的代码在Sparkshell中获取了2到200万个素数的列表。
val n = 2000000
val composite = sc.parallelize(2 to n, 8).map(x => (x, (2 to (n / x)))).flatMap(kv => kv._2.map(_ * kv._1))
val prime = sc.parallelize(2 to n, 8).subtract(composite)
prime.collect()
执行后查看了SparkUI,观察了图中的DAG
现在我的问题是:我只调用了一次subtract函数,为什么会出现这个操作 DAG 中的 3 次? 另外,是否有任何教程可以稍微解释一下 Spark 如何创建这些 DAG? 提前致谢。
subtract
是一个需要洗牌的转换:
- 首先,
RDDs
必须使用相同的分区器本地 ("map-side") 重新分区,转换的部分在阶段 0 和阶段 1 中标记为subtract
。此时两个 RDD 都转换为(item, null)
对。 substract
你在第 2 阶段看到的发生在洗牌之后,此时 RDD 已经合并。这是过滤项目的地方。
一般来说,任何需要洗牌的操作都会至少分两个阶段执行(取决于先行者的数量),属于每个阶段的任务将单独显示。