将 clojure 向量转换为 flambo sql 行

Convert clojure vector to flambo sql row

我正在开发一个函数,将向量转换为 sql 行,以进一步将其转换为数据框,并使用 Apache spark 中的 SQLcontext 将其保存到 table 中。我正在用 clojure 开发,一路上迷路了。 我想这样实施解决方案:

  1. 对于每个 rdd(向量)将其转换为行
  2. 将行转换为数据框
  3. 将数据帧保存到 table
  4. 使用 sql上下文查询 table
  5. 中的特定信息
  6. 以及如何将query的结果再次转为RDD以供进一步分析。

    (defn assign-ecom 
      []
       (let [rdd-fields (-> (:rdd @transformed-rdd)
                     (f/map #(sql/row->vec %))
                      f/collect)]
         (clojure.pprint/pprint rdd-fields)))
    

我正在使用 flambo v0.60 api 函数来抽象 Apache-spark 函数,我也欢迎任何关于如何解决问题的建议。谢谢

这是 link 到 Flambo 的行 -> vec 文档:

Flambo documentation:

我假设您已经有 spark-context (sc) 和 sql-context (sql-ctx)。首先让我们导入我们需要的所有东西:

(import org.apache.spark.sql.RowFactory)
(import org.apache.spark.sql.types.StructType)
(import org.apache.spark.sql.types.StructField)
(import org.apache.spark.sql.types.Metadata)
(import org.apache.spark.sql.types.DataTypes)
  1. 对于每个rdd(向量)将其转换为行

    ;; Vector to Row conversion
    (defn vec->row [v] 
      (RowFactory/create (into-array Object v)))
    
    ;; Example data
    (def rows (-> (f/parallelize sc [["foo" 1] ["bar" 2]])
                  (f/map vec->row)))
    
  2. 将行转换为数据框

    ;; Define schema
    (def schema
      (StructType.
       (into-array StructField
         [(StructField. "k" (DataTypes/StringType) false (Metadata/empty))
          (StructField. "v" (DataTypes/IntegerType) false (Metadata/empty))])))
    
    ;; Create data frame
    (def df (.createDataFrame sql-ctx rows schema))
    
    ;; See if it works
    (.show df)
    
  3. 保存数据帧到table

    (.registerTempTable df "df")
    
  4. 使用 sqlContext 查询 table

    中的特定信息
    (def df-keys (.sql sql-ctx "SELECT UPPER(k) as k FROM df"))
    ;; Check results
    (.show df-keys)
    
  5. 以及如何将query的结果再次转为RDD以供进一步分析。

    (.toJavaRDD df-keys)
    

    或者如果你想要矢量:

    (f/map (.toJavaRDD df-keys) sql/row->vec)