使用 clojure 的 flambo 中的 zipWithUniqueId()

zipWithUniqueId() in flambo using clojure

我想创建一个 rdd,使每一行都有一个索引。我尝试了以下

给定一个 rdd:

["a" "b" "c"] 

(defn make-row-index [input]
  (let [{:keys [col]} input]
    (swap! @rdd assoc :rdd (-> (:rdd xctx)
                          (f/map #(vector %1 %2 ) (range))))))

期望的输出:

 (["a" 0] ["b" 1] ["c" 2])

我遇到了一个参数错误,因为 f/map 被用作 (f/map rdd fn) 想在 apache spark 中使用 zipWithUniqueId() 但我迷失了如何实现它并且我无法在 flambo 中找到等效的功能。感谢任何建议和帮助。

Apache-spark zip with Index

Map implementation in flambo

谢谢

您可以简单地调用 zipWithIndex 然后调用 map 使用 untuple:

(def rdd (f/parallelize sc ["a" "b" "c"]))
(f/map (.zipWithIndex rdd) f/untuple)

您可以使用 .zipWithUniqueId 完全相同的方式,但结果会与您预期的不同。 zipWithUniqueId 将生成对,但不会对索引字段进行排序。

应该也可以使用 zip,但据我所知它不适用于无限范围。

(def idx (f/parallelize sc (range (f/count rdd))))
(f/map (.zip rdd idx) f/untuple)

当你使用 zip 时你应该小心,虽然一般来说 RDD 应该被认为是一个无序集合,如果涉及到混洗。