spark DAG 中的无关 groupBy
Extraneous groupBy in spark DAG
根据 spark DAG 可视化,在第 0 阶段执行 groupBy
之后,第 1 阶段执行 groupBy
。我的代码中只有一个 groupBy
,并且不会期望我正在做的任何其他转换会导致 groupBy
.
这是代码 (clojure
/ flambo
):
;; stage 0
(-> (.textFile sc path 8192)
(f/map (f/fn [msg] (json/parse-string msg true)))
(f/group-by (f/fn [msg] (:mmsi msg)) 8192)
;; stage 1
(f/map-values (f/fn [values] (sort-by :timestamp (vec values))))
(f/flat-map (ft/key-val-fn (f/fn [mmsi messages]
(let [state-map (atom {}) draught-map (atom {})]
(map #(mk-line % state-map draught-map) (vec messages))))))
(f/map (f/fn [line] (json/generate-string line)))
(f/save-as-text-file path)))
我很清楚第 0 阶段是序列 textFile
、map
、groupBy
,而第 1 阶段是 map-values
、map-values
、flat-map
、map
、saveAsTextFile
,但是第1阶段的groupBy
是从哪里来的呢?
由于 groupBy
会导致洗牌,这在计算上既昂贵又耗时,如果可以的话,我不想要一个无关的洗牌。
这里没有无关的groupBy。 groupBy
是一个 two-step 进程。第一步是从 x
转换为 (f(x), x)
的本地 map
。这是在阶段 0 中表示为 groupBy
块的部分。
第二步是non-local groupByKey
,在Stage 1中标记为groupBy
块,只有这部分需要洗牌。
根据 spark DAG 可视化,在第 0 阶段执行 groupBy
之后,第 1 阶段执行 groupBy
。我的代码中只有一个 groupBy
,并且不会期望我正在做的任何其他转换会导致 groupBy
.
这是代码 (clojure
/ flambo
):
;; stage 0
(-> (.textFile sc path 8192)
(f/map (f/fn [msg] (json/parse-string msg true)))
(f/group-by (f/fn [msg] (:mmsi msg)) 8192)
;; stage 1
(f/map-values (f/fn [values] (sort-by :timestamp (vec values))))
(f/flat-map (ft/key-val-fn (f/fn [mmsi messages]
(let [state-map (atom {}) draught-map (atom {})]
(map #(mk-line % state-map draught-map) (vec messages))))))
(f/map (f/fn [line] (json/generate-string line)))
(f/save-as-text-file path)))
我很清楚第 0 阶段是序列 textFile
、map
、groupBy
,而第 1 阶段是 map-values
、map-values
、flat-map
、map
、saveAsTextFile
,但是第1阶段的groupBy
是从哪里来的呢?
由于 groupBy
会导致洗牌,这在计算上既昂贵又耗时,如果可以的话,我不想要一个无关的洗牌。
这里没有无关的groupBy。 groupBy
是一个 two-step 进程。第一步是从 x
转换为 (f(x), x)
的本地 map
。这是在阶段 0 中表示为 groupBy
块的部分。
第二步是non-local groupByKey
,在Stage 1中标记为groupBy
块,只有这部分需要洗牌。