Clojure:Scala/Java Spark Graphx 的互操作问题

Clojure: Scala/Java interop issues for Spark Graphx

我正在尝试通过 Clojure 和 Flambo 使用 Spark/GraphX。

这是我最终得到的代码:

project.clj 文件中:

(defproject spark-tests "0.1.0-SNAPSHOT"
  :description "FIXME: write description"
  :url "http://example.com/FIXME"
  :license {:name "Eclipse Public License"
            :url "http://www.eclipse.org/legal/epl-v10.html"}
  :dependencies [[org.clojure/clojure "1.6.0"]
                 [yieldbot/flambo "0.5.0"]]
  :main ^:skip-aot spark-tests.core
  :target-path "target/%s"
  :checksum :warn
  :profiles {:dev {:aot [flambo.function]}
             :uberjar {:aot :all}
             :provided {:dependencies
                        [[org.apache.spark/spark-core_2.10 "1.3.0"]
                         [org.apache.spark/spark-core_2.10 "1.2.0"]
                         [org.apache.spark/spark-graphx_2.10 "1.2.0"]]}})

然后是我的 Clojure core.clj 文件:

(ns spark-tests.core  
  (:require [flambo.conf :as conf]
            [flambo.api :as f]
            [flambo.tuple :as ft])
  (:import (org.apache.spark.graphx Edge)
           (org.apache.spark.graphx.impl GraphImpl)))

(defonce c (-> (conf/spark-conf)
               (conf/master "local")
               (conf/app-name "flame_princess")))

(defonce sc (f/spark-context c))

(def users (f/parallelize sc [(ft/tuple 3 ["rxin" "student"])
                              (ft/tuple 7 ["jgonzal" "postdoc"])
                              (ft/tuple 5 ["franklin" "prof"])]))

(defn edge
  [source dest attr]
  (new Edge (long source) (long dest) attr))

(def relationships (f/parallelize sc [(edge 3 7 "collab")
                                      (edge 5 3 "advisor")]))

(def g (new GraphImpl users relationships))

当我 运行 该代码时,出现以下错误:

1. Caused by java.lang.ClassCastException
   Cannot cast org.apache.spark.api.java.JavaRDD to
   scala.reflect.ClassTag

  Class.java: 3258  java.lang.Class/cast
  Reflector.java:  427  clojure.lang.Reflector/boxArg
  Reflector.java:  460  clojure.lang.Reflector/boxArgs

免责声明:我没有 Scala 知识。

然后我想可能是因为Flamboreturns我们使用f/parallelize的时候是一个JavaRDD。然后我尝试将 JavaRDD 转换为 GraphX 示例中使用的简单 RDD:

(def g (new GraphImpl (.rdd users) (.rdd relationships)))

但是对于 ParallelCollectionRDD class...

我得到了同样的错误

从那里,我知道可能导致此问题的原因。 Java API for the Graph class is here, the Scala API for the same class is here.

我不清楚的是如何在 Clojure 中有效地使用 class 签名:

org.apache.spark.graphx.Graph<VD,ED>

(Graph 是一个抽象的 class,但我在这个例子中尝试使用 GraphImpl)

我想做的是 re-create that Scala example 使用 Clojure。

如有任何提示,我们将不胜感激!

终于做对了(我想)。这是似乎有效的代码:

(ns spark-tests.core
  (:require [flambo.conf :as conf]
            [flambo.api :as f]
            [flambo.tuple :as ft])
  (:import (org.apache.spark.graphx Edge
                                    Graph)
           (org.apache.spark.api.java JavaRDD
                                      StorageLevels)
           (scala.reflect ClassTag$)))

(defonce c (-> (conf/spark-conf)
               (conf/master "local")
               (conf/app-name "flame_princess")))

(defonce sc (f/spark-context c))

(def users (f/parallelize sc [(ft/tuple 3 ["rxin" "student"])
                              (ft/tuple 7 ["jgonzal" "postdoc"])
                              (ft/tuple 5 ["franklin" "prof"])]))

(defn edge
  [source dest attr]
  (new Edge (long source) (long dest) attr))

(def relationships (f/parallelize sc [(edge 3 7 "collab")
                                      (edge 5 3 "advisor")
                                      (edge 7 3 "advisor")]))


(def g (Graph/apply (.rdd users)
                    (.rdd relationships)
                    "collab"
                    (StorageLevels/MEMORY_ONLY)
                    (StorageLevels/MEMORY_ONLY)
                    (.apply ClassTag$/MODULE$ clojure.lang.PersistentVector)
                    (.apply ClassTag$/MODULE$ java.lang.String)))

(println (.count (.edges g)))

这段代码 returns 的内容 3 似乎是准确的。主要问题是我没有使用 Graph/Apply 创建 class。事实上,这似乎是创建所有对象的方式(看起来是构造函数......)。我不知道为什么这是什么方式,但这可能是由于我缺乏 Scala 知识。如果有人知道,请告诉我为什么:)

之后我只需要填补 apply 函数签名的空白。

需要注意的是最后两个参数:

  • scala.reflect.ClassTag<VD> evidence
  • scala.reflect.ClassTag<ED> evidence

这是用来指导Scala的vertex attribute type(VD)和edge attribute type(ED ). ED的类型是我作为Edgeclass的第三个参数的对象的类型。那么VD的类型就是tuple函数第二个参数的类型