从 clojure.lang.LazySeq 转换为类型 org.apache.spark.api.java.JavaRDD
Convert from clojure.lang.LazySeq to type org.apache.spark.api.java.JavaRDD
我在 clojure 中开发了一个函数,用于从最后一个非空值填充一个空列,我假设这可行,给定
(:require [flambo.api :as f])
(defn replicate-val
[ rdd input ]
(let [{:keys [ col ]} input
result (reductions (fn [a b]
(if (empty? (nth b col))
(assoc b col (nth a col))
b)) rdd )]
(println "Result type is: "(type result))))
知道了:
;=> "Result type is: clojure.lang.LazySeq"
问题是如何使用 flambo(spark 包装器)将其转换回 JavaRDD 类型
我尝试将 let
形式的 (f/map result #(.toJavaRDD %))
转换为 JavaRDD
类型
我遇到了这个错误
"No matching method found: map for class clojure.lang.LazySeq"
这是预期的,因为结果的类型是 clojure.lang.LazySeq
问题是我如何进行这种转换,或者我如何重构代码以适应这种情况。
这是一个示例输入 rdd:
(type rdd) ;=> "org.apache.spark.api.java.JavaRDD"
但看起来像:
[["04" "2" "3"] ["04" "" "5"] ["5" "16" ""] ["07" "" "36"] ["07" "" "34"] ["07" "25" "34"]]
要求的输出是:
[["04" "2" "3"] ["04" "2" "5"] ["5" "16" ""] ["07" "16" "36"] ["07" "16" "34"] ["07" "25" "34"]]
谢谢。
首先,RDDs 是不可迭代的(不要实现 ISeq
)所以你不能使用 reductions
。忽略访问以前记录的整个想法是相当棘手的。首先,您不能直接访问另一个分区的值。此外,只有不需要改组的转换才会保留顺序。
这里最简单的方法是使用具有明确顺序的数据框和 Window 函数,但据我所知,Flambo 没有实现所需的方法。总是可以使用原始 SQL 或访问 Java/Scala API 但如果你想避免这种情况,你可以尝试以下管道。
首先让我们创建一个广播变量,其中包含每个分区的最新值:
(require '[flambo.broadcast :as bd])
(import org.apache.spark.TaskContext)
(def last-per-part (f/fn [it]
(let [context (TaskContext/get) xs (iterator-seq it)]
[[(.partitionId context) (last xs)]])))
(def last-vals-bd
(bd/broadcast sc
(into {} (-> rdd (f/map-partitions last-per-part) (f/collect)))))
接下来是一些实际工作的帮手:
(defn fill-pair [col]
(fn [x] (let [[a b] x] (if (empty? (nth b col)) (assoc b col (nth a col)) b))))
(def fill-pairs
(f/fn [it] (let [part-id (.partitionId (TaskContext/get)) ;; Get partion ID
xs (iterator-seq it) ;; Convert input to seq
prev (if (zero? part-id) ;; Find previous element
(first xs) ((bd/value last-vals-bd) part-id))
;; Create seq of pairs (prev, current)
pairs (partition 2 1 (cons prev xs))
;; Same as before
{:keys [ col ]} input
;; Prepare mapping function
mapper (fill-pair col)]
(map mapper pairs))))
终于可以使用fill-pairs
到map-partitions
:
(-> rdd (f/map-partitions fill-pairs) (f/collect))
这里隐藏的假设是分区的顺序遵循值的顺序。它可能是也可能不是一般情况,但如果没有明确的命令,它可能是你能得到的最好的。
另一种方法是 zipWithIndex
,交换值的顺序并执行带偏移量的连接。
(require '[flambo.tuple :as tp])
(def rdd-idx (f/map-to-pair (.zipWithIndex rdd) #(.swap %)))
(def rdd-idx-offset
(f/map-to-pair rdd-idx
(fn [t] (let [p (f/untuple t)] (tp/tuple (dec' (first p)) (second p))))))
(f/map (f/values (.rightOuterJoin rdd-idx-offset rdd-idx)) f/untuple)
接下来您可以使用与之前类似的方法进行映射。
编辑
速记on using atoms。问题是什么缺乏参考透明度,并且您正在利用给定实现而不是合同的附带属性。 map
语义中没有任何内容要求按给定顺序处理元素。如果内部实现发生变化,它可能不再有效。使用 Clojure
(defn foo [x] (let [aa @a] (swap! a (fn [&args] x)) aa))
(def a (atom 0))
(map foo (range 1 20))
相比于:
(def a (atom 0))
(pmap foo (range 1 20))
我在 clojure 中开发了一个函数,用于从最后一个非空值填充一个空列,我假设这可行,给定
(:require [flambo.api :as f])
(defn replicate-val
[ rdd input ]
(let [{:keys [ col ]} input
result (reductions (fn [a b]
(if (empty? (nth b col))
(assoc b col (nth a col))
b)) rdd )]
(println "Result type is: "(type result))))
知道了:
;=> "Result type is: clojure.lang.LazySeq"
问题是如何使用 flambo(spark 包装器)将其转换回 JavaRDD 类型
我尝试将 let
形式的 (f/map result #(.toJavaRDD %))
转换为 JavaRDD
类型
我遇到了这个错误
"No matching method found: map for class clojure.lang.LazySeq"
这是预期的,因为结果的类型是 clojure.lang.LazySeq
问题是我如何进行这种转换,或者我如何重构代码以适应这种情况。
这是一个示例输入 rdd:
(type rdd) ;=> "org.apache.spark.api.java.JavaRDD"
但看起来像:
[["04" "2" "3"] ["04" "" "5"] ["5" "16" ""] ["07" "" "36"] ["07" "" "34"] ["07" "25" "34"]]
要求的输出是:
[["04" "2" "3"] ["04" "2" "5"] ["5" "16" ""] ["07" "16" "36"] ["07" "16" "34"] ["07" "25" "34"]]
谢谢。
首先,RDDs 是不可迭代的(不要实现 ISeq
)所以你不能使用 reductions
。忽略访问以前记录的整个想法是相当棘手的。首先,您不能直接访问另一个分区的值。此外,只有不需要改组的转换才会保留顺序。
这里最简单的方法是使用具有明确顺序的数据框和 Window 函数,但据我所知,Flambo 没有实现所需的方法。总是可以使用原始 SQL 或访问 Java/Scala API 但如果你想避免这种情况,你可以尝试以下管道。
首先让我们创建一个广播变量,其中包含每个分区的最新值:
(require '[flambo.broadcast :as bd])
(import org.apache.spark.TaskContext)
(def last-per-part (f/fn [it]
(let [context (TaskContext/get) xs (iterator-seq it)]
[[(.partitionId context) (last xs)]])))
(def last-vals-bd
(bd/broadcast sc
(into {} (-> rdd (f/map-partitions last-per-part) (f/collect)))))
接下来是一些实际工作的帮手:
(defn fill-pair [col]
(fn [x] (let [[a b] x] (if (empty? (nth b col)) (assoc b col (nth a col)) b))))
(def fill-pairs
(f/fn [it] (let [part-id (.partitionId (TaskContext/get)) ;; Get partion ID
xs (iterator-seq it) ;; Convert input to seq
prev (if (zero? part-id) ;; Find previous element
(first xs) ((bd/value last-vals-bd) part-id))
;; Create seq of pairs (prev, current)
pairs (partition 2 1 (cons prev xs))
;; Same as before
{:keys [ col ]} input
;; Prepare mapping function
mapper (fill-pair col)]
(map mapper pairs))))
终于可以使用fill-pairs
到map-partitions
:
(-> rdd (f/map-partitions fill-pairs) (f/collect))
这里隐藏的假设是分区的顺序遵循值的顺序。它可能是也可能不是一般情况,但如果没有明确的命令,它可能是你能得到的最好的。
另一种方法是 zipWithIndex
,交换值的顺序并执行带偏移量的连接。
(require '[flambo.tuple :as tp])
(def rdd-idx (f/map-to-pair (.zipWithIndex rdd) #(.swap %)))
(def rdd-idx-offset
(f/map-to-pair rdd-idx
(fn [t] (let [p (f/untuple t)] (tp/tuple (dec' (first p)) (second p))))))
(f/map (f/values (.rightOuterJoin rdd-idx-offset rdd-idx)) f/untuple)
接下来您可以使用与之前类似的方法进行映射。
编辑
速记on using atoms。问题是什么缺乏参考透明度,并且您正在利用给定实现而不是合同的附带属性。 map
语义中没有任何内容要求按给定顺序处理元素。如果内部实现发生变化,它可能不再有效。使用 Clojure
(defn foo [x] (let [aa @a] (swap! a (fn [&args] x)) aa))
(def a (atom 0))
(map foo (range 1 20))
相比于:
(def a (atom 0))
(pmap foo (range 1 20))