在 clojure 中实现 flambo mapValues 函数
Implementing a flambo mapValues function in clojure
我有一个 clojure 函数,它使用 flambo v0.60 函数 api 对样本数据集进行一些分析。我注意到当我使用 (get rdd 2)
而不是获取 rdd 集合中的第二个元素时,它获取的是 rdd 集合第一个元素的第二个字符。我的假设是 clojure 将 rdd 集合的每一行视为一个完整的字符串,而不是一个向量,以便我能够获取集合中的第二个元素。我正在考虑使用 map-values 函数将映射值转换为一个向量,我可以为其获取第二个元素,我试过这个:
(defn split-on-tab-transformation [xctx input]
(assoc xctx :rdd (-> (:rdd xctx)
(spark/map (spark/fn [row] (s/split row #"\t")))
(spark/map-values vec))))
不幸的是我得到一个错误:
java.lang.IllegalArgumentException: No matching method found: mapValues for class org.apache.spark.api.java.JavaRDD...
这是代码 returns rdd 中的第一个集合:
(假设我删除了上面函数
中的(spark/map-values vec)
(defn get-distinct-column-val
"input = {:col val}"
[ xctx input ]
(let [rdds (-> (:rdd xctx)
(f/map (f/fn [row] row))
f/first)]
(clojure.pprint/pprint rdds)))
输出:
[2.00000 770127 200939.000000 \t6094\tBENTONVILLE, AR DPS\t22.500000\t5.000000\t2.500000\t5.000000\t0.000000\t0.000000\t0.000000\t0.000000\t0.000000\t1\tStore Tab\t0.000000\t4.50\t3.83\t5.00\t0.000000\t0.000000\t0.000000\t0.000000\t19.150000]
如果我尝试获取第二个元素770127
(defn get-distinct-column-val
"input = {:col val}"
[ xctx input ]
(let [rdds (-> (:rdd xctx)
(f/map (f/fn [row] row))
f/first)]
(clojure.pprint/pprint (get rdds 1)))
我得到:
[\.]
Flambo documentation for map-values
我是 clojure 的新手,非常感谢任何帮助。谢谢
首先,map-values
(或 Spark API 中的 mapValues
)是仅在 PairRDD 上有效的转换(例如像这样的 [:foo [1 2 3]]
。RDDs with这样的值可以解释为某种映射,其中第一个元素是键,第二个元素是值。
如果你有像这样的 RDD mapValues
会在不更改键的情况下转换值。在这种情况下,您应该使用第二张地图,尽管它似乎已过时,因为 clojure.string/split
已经 returns 一个向量。
一个使用map-values
的简单例子:
(let [pairs [(ft/tuple :foo 1) (ft/tuple :bar 2)]
rdd (f/parallelize-pairs sc pairs) ;; Note parallelize-pairs -> PairRDD
result (-> rdd
(f/map-values inc) ;; Map values
(f/collect))]
(assert (= result [(ft/tuple :foo 2) (ft/tuple :bar 3)])))
根据您的描述,您似乎使用的是输入 RDD 而不是从 split-on-tab-transformation
返回的 RDD。如果我不得不猜测您正在尝试使用原始 xctx
,而不是从 split-on-tab-transformation
返回的那个。由于 Clojure maps
是不可变的 assoc
不会更改传递的参数并且 get-distinct-column-val
接收 RDD[String]
而不是 RDD[Array[String]]
根据命名约定,我假设您想为数组中的单个位置获取不同的值。为了清楚起见,我删除了代码中未使用的部分。首先让我们创建虚拟数据:
(spit "data.txt"
(str "Mazda RX4\t21\t6\t160\n"
"Mazda RX4 Wag\t21\t6\t160\n"
"Datsun 710\t22.8\t4\t108\n"))
添加函数的重写版本
(defn split-on-tab-transformation [xctx]
(assoc xctx :rdd (-> (:rdd xctx)
(f/map #(clojure.string/split % #"\t")))))
(defn get-distinct-column-val
[xctx col]
(-> (:rdd xctx)
(f/map #(get % col))
(f/distinct)))
和结果
(assert
(= #{"Mazda RX4 Wag" "Datsun 710" "Mazda RX4"}
(-> {:sc sc :rdd (f/text-file sc "data.txt")}
(split-on-tab-transformation)
(get-distinct-column-val 0)
(f/collect)
(set))))
我有一个 clojure 函数,它使用 flambo v0.60 函数 api 对样本数据集进行一些分析。我注意到当我使用 (get rdd 2)
而不是获取 rdd 集合中的第二个元素时,它获取的是 rdd 集合第一个元素的第二个字符。我的假设是 clojure 将 rdd 集合的每一行视为一个完整的字符串,而不是一个向量,以便我能够获取集合中的第二个元素。我正在考虑使用 map-values 函数将映射值转换为一个向量,我可以为其获取第二个元素,我试过这个:
(defn split-on-tab-transformation [xctx input]
(assoc xctx :rdd (-> (:rdd xctx)
(spark/map (spark/fn [row] (s/split row #"\t")))
(spark/map-values vec))))
不幸的是我得到一个错误:
java.lang.IllegalArgumentException: No matching method found: mapValues for class org.apache.spark.api.java.JavaRDD...
这是代码 returns rdd 中的第一个集合: (假设我删除了上面函数
中的(spark/map-values vec)
(defn get-distinct-column-val
"input = {:col val}"
[ xctx input ]
(let [rdds (-> (:rdd xctx)
(f/map (f/fn [row] row))
f/first)]
(clojure.pprint/pprint rdds)))
输出:
[2.00000 770127 200939.000000 \t6094\tBENTONVILLE, AR DPS\t22.500000\t5.000000\t2.500000\t5.000000\t0.000000\t0.000000\t0.000000\t0.000000\t0.000000\t1\tStore Tab\t0.000000\t4.50\t3.83\t5.00\t0.000000\t0.000000\t0.000000\t0.000000\t19.150000]
如果我尝试获取第二个元素770127
(defn get-distinct-column-val
"input = {:col val}"
[ xctx input ]
(let [rdds (-> (:rdd xctx)
(f/map (f/fn [row] row))
f/first)]
(clojure.pprint/pprint (get rdds 1)))
我得到:
[\.]
Flambo documentation for map-values
我是 clojure 的新手,非常感谢任何帮助。谢谢
首先,map-values
(或 Spark API 中的 mapValues
)是仅在 PairRDD 上有效的转换(例如像这样的 [:foo [1 2 3]]
。RDDs with这样的值可以解释为某种映射,其中第一个元素是键,第二个元素是值。
如果你有像这样的 RDD mapValues
会在不更改键的情况下转换值。在这种情况下,您应该使用第二张地图,尽管它似乎已过时,因为 clojure.string/split
已经 returns 一个向量。
一个使用map-values
的简单例子:
(let [pairs [(ft/tuple :foo 1) (ft/tuple :bar 2)]
rdd (f/parallelize-pairs sc pairs) ;; Note parallelize-pairs -> PairRDD
result (-> rdd
(f/map-values inc) ;; Map values
(f/collect))]
(assert (= result [(ft/tuple :foo 2) (ft/tuple :bar 3)])))
根据您的描述,您似乎使用的是输入 RDD 而不是从 split-on-tab-transformation
返回的 RDD。如果我不得不猜测您正在尝试使用原始 xctx
,而不是从 split-on-tab-transformation
返回的那个。由于 Clojure maps
是不可变的 assoc
不会更改传递的参数并且 get-distinct-column-val
接收 RDD[String]
而不是 RDD[Array[String]]
根据命名约定,我假设您想为数组中的单个位置获取不同的值。为了清楚起见,我删除了代码中未使用的部分。首先让我们创建虚拟数据:
(spit "data.txt"
(str "Mazda RX4\t21\t6\t160\n"
"Mazda RX4 Wag\t21\t6\t160\n"
"Datsun 710\t22.8\t4\t108\n"))
添加函数的重写版本
(defn split-on-tab-transformation [xctx]
(assoc xctx :rdd (-> (:rdd xctx)
(f/map #(clojure.string/split % #"\t")))))
(defn get-distinct-column-val
[xctx col]
(-> (:rdd xctx)
(f/map #(get % col))
(f/distinct)))
和结果
(assert
(= #{"Mazda RX4 Wag" "Datsun 710" "Mazda RX4"}
(-> {:sc sc :rdd (f/text-file sc "data.txt")}
(split-on-tab-transformation)
(get-distinct-column-val 0)
(f/collect)
(set))))