spark DAG 中的无关 groupBy

Extraneous groupBy in spark DAG

根据 spark DAG 可视化,在第 0 阶段执行 groupBy 之后,第 1 阶段执行 groupBy。我的代码中只有一个 groupBy,并且不会期望我正在做的任何其他转换会导致 groupBy.

这是代码 (clojure / flambo):

;; stage 0
(-> (.textFile sc path 8192)
    (f/map (f/fn [msg] (json/parse-string msg true)))
    (f/group-by (f/fn [msg] (:mmsi msg)) 8192)

;; stage 1
    (f/map-values (f/fn [values] (sort-by :timestamp (vec values))))
    (f/flat-map (ft/key-val-fn (f/fn [mmsi messages]
                                 (let [state-map (atom {}) draught-map (atom {})]
                                   (map #(mk-line % state-map draught-map) (vec messages))))))
  (f/map (f/fn [line] (json/generate-string line)))
  (f/save-as-text-file path)))

我很清楚第 0 阶段是序列 textFilemapgroupBy,而第 1 阶段是 map-valuesmap-valuesflat-mapmapsaveAsTextFile,但是第1阶段的groupBy是从哪里来的呢?

由于 groupBy 会导致洗牌,这在计算上既昂贵又耗时,如果可以的话,我不想要一个无关的洗牌。

这里没有无关的groupBy。 groupBy 是一个 two-step 进程。第一步是从 x 转换为 (f(x), x) 的本地 map。这是在阶段 0 中表示为 groupBy 块的部分。

第二步是non-local groupByKey,在Stage 1中标记为groupBy块,只有这部分需要洗牌。