如何解释 Apache Spark RDD 沿袭图?
How can I explain the Apache Spark RDD Lineage Graph?
我对以下代码没有什么疑问:
val input1 = rawinput.map(_.split("\t")).map(x=>(x(6).trim(),x)).sortByKey()
val input2 = input1.map(x=> x._2.mkString("\t"))
val x0 = input2.map(_.split("\t")).map(x => (x(6),x(0))
val x1 = input2.map(_.split("\t")).map(x => (x(6),x(1))
val x2 = input2.map(_.split("\t")).map(x => (x(6),x(2))
val x3 = input2.map(_.split("\t")).map(x => (x(6),x(3))
val x4 = input2.map(_.split("\t")).map(x => (x(6),x(4))
val x5 = input2.map(_.split("\t")).map(x => (x(6),x(5))
val x6 = input2.map(_.split("\t")).map(x => (x(6),x(6))
val x = x0 union x1 union x2 union x3 union x4 union x5 union x6
<pre>
**Lineage Graph:**
(7) UnionRDD[25] at union at rddCustUtil.scala:78 []
| UnionRDD[24] at union at rddCustUtil.scala:78 []
| UnionRDD[23] at union at rddCustUtil.scala:78 []
| UnionRDD[22] at union at rddCustUtil.scala:78 []
| UnionRDD[21] at union at rddCustUtil.scala:78 []
| UnionRDD[20] at union at rddCustUtil.scala:78 []
| MapPartitionsRDD[7] at map at rddCustUtil.scala:43 []
| MapPartitionsRDD[6] at map at rddCustUtil.scala:43 []
| MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
| ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
| MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
| /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
| /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
| MapPartitionsRDD[9] at map at rddCustUtil.scala:48 []
| MapPartitionsRDD[8] at map at rddCustUtil.scala:48 []
| MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
| ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
| MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
| /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
| /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
| MapPartitionsRDD[11] at map at rddCustUtil.scala:53 []
| MapPartitionsRDD[10] at map at rddCustUtil.scala:53 []
| MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
| ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
| MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
| /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
| /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
| MapPartitionsRDD[13] at map at rddCustUtil.scala:58 []
| MapPartitionsRDD[12] at map at rddCustUtil.scala:58 []
| MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
| ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
| MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
| /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
| /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
| MapPartitionsRDD[15] at map at rddCustUtil.scala:63 []
| MapPartitionsRDD[14] at map at rddCustUtil.scala:63 []
| MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
| ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
| MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
| /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
| /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
| MapPartitionsRDD[17] at map at rddCustUtil.scala:68 []
| MapPartitionsRDD[16] at map at rddCustUtil.scala:68 []
| MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
| ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
| MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
| /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
| /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
| MapPartitionsRDD[19] at map at rddCustUtil.scala:73 []
| MapPartitionsRDD[18] at map at rddCustUtil.scala:73 []
| MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
| ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
| MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
| /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
| /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
</pre>
- 你能解释一下当它显示 7 ShuffledRDD[4] 时将执行多少个 shuffle 阶段吗?
- 能否请您详细解释一下 DAG 流程?
- 这个操作成本高吗?
how many shuffle stage will be executed
事实上,对数据进行排序所需的洗牌发生了 7 次,因为 Spark 的评估是惰性的并且按需运行,除非 cached 它将为中的每个分支重新计算需要它的 DAG。为了解决这个问题(并可能使这个计算更快),你可以 cache(或者,更一般地说,persist)input2
在你多次使用它之前:
val input1 = rawinput.map(_.split("\t")).map(x=>(x(6).trim(),x)).sortByKey()
val input2 = input1.map(x=> x._2.mkString("\t")).cache()
// continue as before
Can you please give me details explanation below DAG flow
您的每个 x_
RDD 都是使用以下计算方法 "separately" 计算得出的:
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
| MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
| /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
| /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
| MapPartitionsRDD[9] at map at rddCustUtil.scala:48 []
| MapPartitionsRDD[8] at map at rddCustUtil.scala:48 []
| MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
| ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
其中显示了从 textFile 创建 rawinput
的计算,然后是排序和三个 map
操作。
然后,你有 6 个联合操作联合这 7 个 RDD。
Is this operation expensive?
YES,好像是吧。正如上面所建议的,缓存可以使它更快 - 但有更好的方法来实现这一点 - 无需将 RDD 拆分为许多单独的 :
val x = rawinput.map(_.split("\t"))
.keyBy(_(6).trim()) // extract key
.flatMap{ case (k, arr) => arr.take(7).zipWithIndex.map((k, _)) } // flatMap into (key, (value, index))
.sortBy { case (k, (_, index)) => (index, k) } // sort by index first, key second
.map { case (k, (value, _)) => (k, value) } // remove index, it was just used for sorting
这将执行单个随机播放操作,并且不需要保留数据。 DAG 看起来像这样:
(4) MapPartitionsRDD[9] at map at Test.scala:75 []
| MapPartitionsRDD[8] at sortBy at Test.scala:74 []
| ShuffledRDD[7] at sortBy at Test.scala:74 []
+-(4) MapPartitionsRDD[4] at sortBy at Test.scala:74 []
| MapPartitionsRDD[3] at flatMap at Test.scala:73 []
| MapPartitionsRDD[2] at keyBy at Test.scala:72 []
| MapPartitionsRDD[1] at map at Test.scala:71 []
| ParallelCollectionRDD[0] at parallelize at Test.scala:64 []
我对以下代码没有什么疑问:
val input1 = rawinput.map(_.split("\t")).map(x=>(x(6).trim(),x)).sortByKey()
val input2 = input1.map(x=> x._2.mkString("\t"))
val x0 = input2.map(_.split("\t")).map(x => (x(6),x(0))
val x1 = input2.map(_.split("\t")).map(x => (x(6),x(1))
val x2 = input2.map(_.split("\t")).map(x => (x(6),x(2))
val x3 = input2.map(_.split("\t")).map(x => (x(6),x(3))
val x4 = input2.map(_.split("\t")).map(x => (x(6),x(4))
val x5 = input2.map(_.split("\t")).map(x => (x(6),x(5))
val x6 = input2.map(_.split("\t")).map(x => (x(6),x(6))
val x = x0 union x1 union x2 union x3 union x4 union x5 union x6
<pre>
**Lineage Graph:**
(7) UnionRDD[25] at union at rddCustUtil.scala:78 []
| UnionRDD[24] at union at rddCustUtil.scala:78 []
| UnionRDD[23] at union at rddCustUtil.scala:78 []
| UnionRDD[22] at union at rddCustUtil.scala:78 []
| UnionRDD[21] at union at rddCustUtil.scala:78 []
| UnionRDD[20] at union at rddCustUtil.scala:78 []
| MapPartitionsRDD[7] at map at rddCustUtil.scala:43 []
| MapPartitionsRDD[6] at map at rddCustUtil.scala:43 []
| MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
| ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
| MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
| /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
| /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
| MapPartitionsRDD[9] at map at rddCustUtil.scala:48 []
| MapPartitionsRDD[8] at map at rddCustUtil.scala:48 []
| MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
| ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
| MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
| /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
| /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
| MapPartitionsRDD[11] at map at rddCustUtil.scala:53 []
| MapPartitionsRDD[10] at map at rddCustUtil.scala:53 []
| MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
| ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
| MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
| /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
| /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
| MapPartitionsRDD[13] at map at rddCustUtil.scala:58 []
| MapPartitionsRDD[12] at map at rddCustUtil.scala:58 []
| MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
| ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
| MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
| /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
| /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
| MapPartitionsRDD[15] at map at rddCustUtil.scala:63 []
| MapPartitionsRDD[14] at map at rddCustUtil.scala:63 []
| MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
| ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
| MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
| /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
| /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
| MapPartitionsRDD[17] at map at rddCustUtil.scala:68 []
| MapPartitionsRDD[16] at map at rddCustUtil.scala:68 []
| MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
| ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
| MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
| /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
| /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
| MapPartitionsRDD[19] at map at rddCustUtil.scala:73 []
| MapPartitionsRDD[18] at map at rddCustUtil.scala:73 []
| MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
| ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
| MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
| /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
| /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
</pre>
- 你能解释一下当它显示 7 ShuffledRDD[4] 时将执行多少个 shuffle 阶段吗?
- 能否请您详细解释一下 DAG 流程?
- 这个操作成本高吗?
how many shuffle stage will be executed
事实上,对数据进行排序所需的洗牌发生了 7 次,因为 Spark 的评估是惰性的并且按需运行,除非 cached 它将为中的每个分支重新计算需要它的 DAG。为了解决这个问题(并可能使这个计算更快),你可以 cache(或者,更一般地说,persist)input2
在你多次使用它之前:
val input1 = rawinput.map(_.split("\t")).map(x=>(x(6).trim(),x)).sortByKey()
val input2 = input1.map(x=> x._2.mkString("\t")).cache()
// continue as before
Can you please give me details explanation below DAG flow
您的每个 x_
RDD 都是使用以下计算方法 "separately" 计算得出的:
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
| MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
| /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
| /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
| MapPartitionsRDD[9] at map at rddCustUtil.scala:48 []
| MapPartitionsRDD[8] at map at rddCustUtil.scala:48 []
| MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
| ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
其中显示了从 textFile 创建 rawinput
的计算,然后是排序和三个 map
操作。
然后,你有 6 个联合操作联合这 7 个 RDD。
Is this operation expensive?
YES,好像是吧。正如上面所建议的,缓存可以使它更快 - 但有更好的方法来实现这一点 - 无需将 RDD 拆分为许多单独的 :
val x = rawinput.map(_.split("\t"))
.keyBy(_(6).trim()) // extract key
.flatMap{ case (k, arr) => arr.take(7).zipWithIndex.map((k, _)) } // flatMap into (key, (value, index))
.sortBy { case (k, (_, index)) => (index, k) } // sort by index first, key second
.map { case (k, (value, _)) => (k, value) } // remove index, it was just used for sorting
这将执行单个随机播放操作,并且不需要保留数据。 DAG 看起来像这样:
(4) MapPartitionsRDD[9] at map at Test.scala:75 []
| MapPartitionsRDD[8] at sortBy at Test.scala:74 []
| ShuffledRDD[7] at sortBy at Test.scala:74 []
+-(4) MapPartitionsRDD[4] at sortBy at Test.scala:74 []
| MapPartitionsRDD[3] at flatMap at Test.scala:73 []
| MapPartitionsRDD[2] at keyBy at Test.scala:72 []
| MapPartitionsRDD[1] at map at Test.scala:71 []
| ParallelCollectionRDD[0] at parallelize at Test.scala:64 []