Clojure:Scala/Java Spark Graphx 的互操作问题
Clojure: Scala/Java interop issues for Spark Graphx
我正在尝试通过 Clojure 和 Flambo 使用 Spark/GraphX。
这是我最终得到的代码:
在 project.clj
文件中:
(defproject spark-tests "0.1.0-SNAPSHOT"
:description "FIXME: write description"
:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.6.0"]
[yieldbot/flambo "0.5.0"]]
:main ^:skip-aot spark-tests.core
:target-path "target/%s"
:checksum :warn
:profiles {:dev {:aot [flambo.function]}
:uberjar {:aot :all}
:provided {:dependencies
[[org.apache.spark/spark-core_2.10 "1.3.0"]
[org.apache.spark/spark-core_2.10 "1.2.0"]
[org.apache.spark/spark-graphx_2.10 "1.2.0"]]}})
然后是我的 Clojure core.clj
文件:
(ns spark-tests.core
(:require [flambo.conf :as conf]
[flambo.api :as f]
[flambo.tuple :as ft])
(:import (org.apache.spark.graphx Edge)
(org.apache.spark.graphx.impl GraphImpl)))
(defonce c (-> (conf/spark-conf)
(conf/master "local")
(conf/app-name "flame_princess")))
(defonce sc (f/spark-context c))
(def users (f/parallelize sc [(ft/tuple 3 ["rxin" "student"])
(ft/tuple 7 ["jgonzal" "postdoc"])
(ft/tuple 5 ["franklin" "prof"])]))
(defn edge
[source dest attr]
(new Edge (long source) (long dest) attr))
(def relationships (f/parallelize sc [(edge 3 7 "collab")
(edge 5 3 "advisor")]))
(def g (new GraphImpl users relationships))
当我 运行 该代码时,出现以下错误:
1. Caused by java.lang.ClassCastException
Cannot cast org.apache.spark.api.java.JavaRDD to
scala.reflect.ClassTag
Class.java: 3258 java.lang.Class/cast
Reflector.java: 427 clojure.lang.Reflector/boxArg
Reflector.java: 460 clojure.lang.Reflector/boxArgs
免责声明:我没有 Scala 知识。
然后我想可能是因为Flambo
returns我们使用f/parallelize
的时候是一个JavaRDD。然后我尝试将 JavaRDD 转换为 GraphX 示例中使用的简单 RDD:
(def g (new GraphImpl (.rdd users) (.rdd relationships)))
但是对于 ParallelCollectionRDD
class...
我得到了同样的错误
从那里,我知道可能导致此问题的原因。 Java API for the Graph class is here, the Scala API for the same class is here.
我不清楚的是如何在 Clojure 中有效地使用 class 签名:
org.apache.spark.graphx.Graph<VD,ED>
(Graph 是一个抽象的 class,但我在这个例子中尝试使用 GraphImpl)
我想做的是 re-create that Scala example 使用 Clojure。
如有任何提示,我们将不胜感激!
终于做对了(我想)。这是似乎有效的代码:
(ns spark-tests.core
(:require [flambo.conf :as conf]
[flambo.api :as f]
[flambo.tuple :as ft])
(:import (org.apache.spark.graphx Edge
Graph)
(org.apache.spark.api.java JavaRDD
StorageLevels)
(scala.reflect ClassTag$)))
(defonce c (-> (conf/spark-conf)
(conf/master "local")
(conf/app-name "flame_princess")))
(defonce sc (f/spark-context c))
(def users (f/parallelize sc [(ft/tuple 3 ["rxin" "student"])
(ft/tuple 7 ["jgonzal" "postdoc"])
(ft/tuple 5 ["franklin" "prof"])]))
(defn edge
[source dest attr]
(new Edge (long source) (long dest) attr))
(def relationships (f/parallelize sc [(edge 3 7 "collab")
(edge 5 3 "advisor")
(edge 7 3 "advisor")]))
(def g (Graph/apply (.rdd users)
(.rdd relationships)
"collab"
(StorageLevels/MEMORY_ONLY)
(StorageLevels/MEMORY_ONLY)
(.apply ClassTag$/MODULE$ clojure.lang.PersistentVector)
(.apply ClassTag$/MODULE$ java.lang.String)))
(println (.count (.edges g)))
这段代码 returns 的内容 3
似乎是准确的。主要问题是我没有使用 Graph/Apply
创建 class。事实上,这似乎是创建所有对象的方式(看起来是构造函数......)。我不知道为什么这是什么方式,但这可能是由于我缺乏 Scala 知识。如果有人知道,请告诉我为什么:)
之后我只需要填补 apply
函数签名的空白。
需要注意的是最后两个参数:
scala.reflect.ClassTag<VD> evidence
scala.reflect.ClassTag<ED> evidence
这是用来指导Scala的vertex attribute type
(VD)和edge attribute type
(ED ). ED
的类型是我作为Edge
class的第三个参数的对象的类型。那么VD
的类型就是tuple
函数第二个参数的类型
我正在尝试通过 Clojure 和 Flambo 使用 Spark/GraphX。
这是我最终得到的代码:
在 project.clj
文件中:
(defproject spark-tests "0.1.0-SNAPSHOT"
:description "FIXME: write description"
:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.6.0"]
[yieldbot/flambo "0.5.0"]]
:main ^:skip-aot spark-tests.core
:target-path "target/%s"
:checksum :warn
:profiles {:dev {:aot [flambo.function]}
:uberjar {:aot :all}
:provided {:dependencies
[[org.apache.spark/spark-core_2.10 "1.3.0"]
[org.apache.spark/spark-core_2.10 "1.2.0"]
[org.apache.spark/spark-graphx_2.10 "1.2.0"]]}})
然后是我的 Clojure core.clj
文件:
(ns spark-tests.core
(:require [flambo.conf :as conf]
[flambo.api :as f]
[flambo.tuple :as ft])
(:import (org.apache.spark.graphx Edge)
(org.apache.spark.graphx.impl GraphImpl)))
(defonce c (-> (conf/spark-conf)
(conf/master "local")
(conf/app-name "flame_princess")))
(defonce sc (f/spark-context c))
(def users (f/parallelize sc [(ft/tuple 3 ["rxin" "student"])
(ft/tuple 7 ["jgonzal" "postdoc"])
(ft/tuple 5 ["franklin" "prof"])]))
(defn edge
[source dest attr]
(new Edge (long source) (long dest) attr))
(def relationships (f/parallelize sc [(edge 3 7 "collab")
(edge 5 3 "advisor")]))
(def g (new GraphImpl users relationships))
当我 运行 该代码时,出现以下错误:
1. Caused by java.lang.ClassCastException
Cannot cast org.apache.spark.api.java.JavaRDD to
scala.reflect.ClassTag
Class.java: 3258 java.lang.Class/cast
Reflector.java: 427 clojure.lang.Reflector/boxArg
Reflector.java: 460 clojure.lang.Reflector/boxArgs
免责声明:我没有 Scala 知识。
然后我想可能是因为Flambo
returns我们使用f/parallelize
的时候是一个JavaRDD。然后我尝试将 JavaRDD 转换为 GraphX 示例中使用的简单 RDD:
(def g (new GraphImpl (.rdd users) (.rdd relationships)))
但是对于 ParallelCollectionRDD
class...
从那里,我知道可能导致此问题的原因。 Java API for the Graph class is here, the Scala API for the same class is here.
我不清楚的是如何在 Clojure 中有效地使用 class 签名:
org.apache.spark.graphx.Graph<VD,ED>
(Graph 是一个抽象的 class,但我在这个例子中尝试使用 GraphImpl)
我想做的是 re-create that Scala example 使用 Clojure。
如有任何提示,我们将不胜感激!
终于做对了(我想)。这是似乎有效的代码:
(ns spark-tests.core
(:require [flambo.conf :as conf]
[flambo.api :as f]
[flambo.tuple :as ft])
(:import (org.apache.spark.graphx Edge
Graph)
(org.apache.spark.api.java JavaRDD
StorageLevels)
(scala.reflect ClassTag$)))
(defonce c (-> (conf/spark-conf)
(conf/master "local")
(conf/app-name "flame_princess")))
(defonce sc (f/spark-context c))
(def users (f/parallelize sc [(ft/tuple 3 ["rxin" "student"])
(ft/tuple 7 ["jgonzal" "postdoc"])
(ft/tuple 5 ["franklin" "prof"])]))
(defn edge
[source dest attr]
(new Edge (long source) (long dest) attr))
(def relationships (f/parallelize sc [(edge 3 7 "collab")
(edge 5 3 "advisor")
(edge 7 3 "advisor")]))
(def g (Graph/apply (.rdd users)
(.rdd relationships)
"collab"
(StorageLevels/MEMORY_ONLY)
(StorageLevels/MEMORY_ONLY)
(.apply ClassTag$/MODULE$ clojure.lang.PersistentVector)
(.apply ClassTag$/MODULE$ java.lang.String)))
(println (.count (.edges g)))
这段代码 returns 的内容 3
似乎是准确的。主要问题是我没有使用 Graph/Apply
创建 class。事实上,这似乎是创建所有对象的方式(看起来是构造函数......)。我不知道为什么这是什么方式,但这可能是由于我缺乏 Scala 知识。如果有人知道,请告诉我为什么:)
之后我只需要填补 apply
函数签名的空白。
需要注意的是最后两个参数:
scala.reflect.ClassTag<VD> evidence
scala.reflect.ClassTag<ED> evidence
这是用来指导Scala的vertex attribute type
(VD)和edge attribute type
(ED ). ED
的类型是我作为Edge
class的第三个参数的对象的类型。那么VD
的类型就是tuple
函数第二个参数的类型