将 clojure 向量转换为 flambo sql 行
Convert clojure vector to flambo sql row
我正在开发一个函数,将向量转换为 sql 行,以进一步将其转换为数据框,并使用 Apache spark 中的 SQLcontext 将其保存到 table 中。我正在用 clojure 开发,一路上迷路了。
我想这样实施解决方案:
- 对于每个 rdd(向量)将其转换为行
- 将行转换为数据框
- 将数据帧保存到 table
- 使用 sql上下文查询 table
中的特定信息
以及如何将query的结果再次转为RDD以供进一步分析。
(defn assign-ecom
[]
(let [rdd-fields (-> (:rdd @transformed-rdd)
(f/map #(sql/row->vec %))
f/collect)]
(clojure.pprint/pprint rdd-fields)))
我正在使用 flambo v0.60 api 函数来抽象 Apache-spark 函数,我也欢迎任何关于如何解决问题的建议。谢谢
这是 link 到 Flambo 的行 -> vec 文档:
我假设您已经有 spark-context
(sc
) 和 sql-context
(sql-ctx
)。首先让我们导入我们需要的所有东西:
(import org.apache.spark.sql.RowFactory)
(import org.apache.spark.sql.types.StructType)
(import org.apache.spark.sql.types.StructField)
(import org.apache.spark.sql.types.Metadata)
(import org.apache.spark.sql.types.DataTypes)
对于每个rdd(向量)将其转换为行
;; Vector to Row conversion
(defn vec->row [v]
(RowFactory/create (into-array Object v)))
;; Example data
(def rows (-> (f/parallelize sc [["foo" 1] ["bar" 2]])
(f/map vec->row)))
将行转换为数据框
;; Define schema
(def schema
(StructType.
(into-array StructField
[(StructField. "k" (DataTypes/StringType) false (Metadata/empty))
(StructField. "v" (DataTypes/IntegerType) false (Metadata/empty))])))
;; Create data frame
(def df (.createDataFrame sql-ctx rows schema))
;; See if it works
(.show df)
保存数据帧到table
(.registerTempTable df "df")
使用 sqlContext 查询 table
中的特定信息
(def df-keys (.sql sql-ctx "SELECT UPPER(k) as k FROM df"))
;; Check results
(.show df-keys)
以及如何将query的结果再次转为RDD以供进一步分析。
(.toJavaRDD df-keys)
或者如果你想要矢量:
(f/map (.toJavaRDD df-keys) sql/row->vec)
我正在开发一个函数,将向量转换为 sql 行,以进一步将其转换为数据框,并使用 Apache spark 中的 SQLcontext 将其保存到 table 中。我正在用 clojure 开发,一路上迷路了。 我想这样实施解决方案:
- 对于每个 rdd(向量)将其转换为行
- 将行转换为数据框
- 将数据帧保存到 table
- 使用 sql上下文查询 table 中的特定信息
以及如何将query的结果再次转为RDD以供进一步分析。
(defn assign-ecom [] (let [rdd-fields (-> (:rdd @transformed-rdd) (f/map #(sql/row->vec %)) f/collect)] (clojure.pprint/pprint rdd-fields)))
我正在使用 flambo v0.60 api 函数来抽象 Apache-spark 函数,我也欢迎任何关于如何解决问题的建议。谢谢
这是 link 到 Flambo 的行 -> vec 文档:
我假设您已经有 spark-context
(sc
) 和 sql-context
(sql-ctx
)。首先让我们导入我们需要的所有东西:
(import org.apache.spark.sql.RowFactory)
(import org.apache.spark.sql.types.StructType)
(import org.apache.spark.sql.types.StructField)
(import org.apache.spark.sql.types.Metadata)
(import org.apache.spark.sql.types.DataTypes)
对于每个rdd(向量)将其转换为行
;; Vector to Row conversion (defn vec->row [v] (RowFactory/create (into-array Object v))) ;; Example data (def rows (-> (f/parallelize sc [["foo" 1] ["bar" 2]]) (f/map vec->row)))
将行转换为数据框
;; Define schema (def schema (StructType. (into-array StructField [(StructField. "k" (DataTypes/StringType) false (Metadata/empty)) (StructField. "v" (DataTypes/IntegerType) false (Metadata/empty))]))) ;; Create data frame (def df (.createDataFrame sql-ctx rows schema)) ;; See if it works (.show df)
保存数据帧到table
(.registerTempTable df "df")
使用 sqlContext 查询 table
中的特定信息(def df-keys (.sql sql-ctx "SELECT UPPER(k) as k FROM df")) ;; Check results (.show df-keys)
以及如何将query的结果再次转为RDD以供进一步分析。
(.toJavaRDD df-keys)
或者如果你想要矢量:
(f/map (.toJavaRDD df-keys) sql/row->vec)