使用 clojure 的 flambo 中的 zipWithUniqueId()
zipWithUniqueId() in flambo using clojure
我想创建一个 rdd,使每一行都有一个索引。我尝试了以下
给定一个 rdd:
["a" "b" "c"]
(defn make-row-index [input]
(let [{:keys [col]} input]
(swap! @rdd assoc :rdd (-> (:rdd xctx)
(f/map #(vector %1 %2 ) (range))))))
期望的输出:
(["a" 0] ["b" 1] ["c" 2])
我遇到了一个参数错误,因为 f/map 被用作 (f/map rdd fn)
想在 apache spark 中使用 zipWithUniqueId()
但我迷失了如何实现它并且我无法在 flambo 中找到等效的功能。感谢任何建议和帮助。
谢谢
您可以简单地调用 zipWithIndex
然后调用 map
使用 untuple
:
(def rdd (f/parallelize sc ["a" "b" "c"]))
(f/map (.zipWithIndex rdd) f/untuple)
您可以使用 .zipWithUniqueId
完全相同的方式,但结果会与您预期的不同。 zipWithUniqueId
将生成对,但不会对索引字段进行排序。
应该也可以使用 zip
,但据我所知它不适用于无限范围。
(def idx (f/parallelize sc (range (f/count rdd))))
(f/map (.zip rdd idx) f/untuple)
当你使用 zip
时你应该小心,虽然一般来说 RDD 应该被认为是一个无序集合,如果涉及到混洗。
我想创建一个 rdd,使每一行都有一个索引。我尝试了以下
给定一个 rdd:
["a" "b" "c"]
(defn make-row-index [input]
(let [{:keys [col]} input]
(swap! @rdd assoc :rdd (-> (:rdd xctx)
(f/map #(vector %1 %2 ) (range))))))
期望的输出:
(["a" 0] ["b" 1] ["c" 2])
我遇到了一个参数错误,因为 f/map 被用作 (f/map rdd fn)
想在 apache spark 中使用 zipWithUniqueId()
但我迷失了如何实现它并且我无法在 flambo 中找到等效的功能。感谢任何建议和帮助。
谢谢
您可以简单地调用 zipWithIndex
然后调用 map
使用 untuple
:
(def rdd (f/parallelize sc ["a" "b" "c"]))
(f/map (.zipWithIndex rdd) f/untuple)
您可以使用 .zipWithUniqueId
完全相同的方式,但结果会与您预期的不同。 zipWithUniqueId
将生成对,但不会对索引字段进行排序。
应该也可以使用 zip
,但据我所知它不适用于无限范围。
(def idx (f/parallelize sc (range (f/count rdd))))
(f/map (.zip rdd idx) f/untuple)
当你使用 zip
时你应该小心,虽然一般来说 RDD 应该被认为是一个无序集合,如果涉及到混洗。