GraphX - 加权最短路径实现 - java.lang.NoSuchMethodError
GraphX - Weighted shortest path implementation - java.lang.NoSuchMethodError
编辑 - 我发现这本书是为 scala 1.6
而写的,但其余部分是 2.11
.
我正在尝试实施 Michael Malak 和 Robin East 的 Spark GraphX in Action
书中的加权最短路径算法。有问题的部分是第 6 章 here.
中的清单 6.4 "Executing the shortest path algorithm that uses breadcrumbs"
我有自己的图表,是从两个 RDD 创建的。有 344436
个顶点和 772983
个边。我可以使用本机 GraphX 库执行未加权的最短路径计算,并且我对图形构建充满信心。
在这种情况下,我使用他们的 Dijkstra 实现如下:
val my_graph: Graph[(Long),Double] = Graph.apply(verticesRDD, edgesRDD).cache()
def dijkstra[VD](g:Graph[VD,Double], origin:VertexId) = {
var g2 = g.mapVertices(
(vid,vd) => (false, if (vid == origin) 0 else Double.MaxValue, List[VertexId]())
)
for (i <- 1L to g.vertices.count-1) {
val currentVertexId = g2.vertices
.filter(!_._2._1)
.fold((0L, (false, Double.MaxValue, List[VertexId]())))(
(a,b) => if (a._2._2 < b._2._2) a else b)
)
._1
val newDistances = g2.aggregateMessages[(Double, List[VertexId])](
ctx => if (ctx.srcId == currentVertexId) {
ctx.sendToDst((ctx.srcAttr._2 + ctx.attr, ctx.srcAttr._3 :+ ctx.srcId))
},
(a,b) => if (a._1 < b._1) a else b
)
g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) => {
val newSumVal = newSum.getOrElse((Double.MaxValue,List[VertexId]()))
(
vd._1 || vid == currentVertexId,
math.min(vd._2, newSumVal._1),
if (vd._2 < newSumVal._1) vd._3 else newSumVal._2
)
})
}
g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
(vd, dist.getOrElse((false,Double.MaxValue,List[VertexId]()))
.productIterator.toList.tail
))
}
// Path Finding - random node from which to find all paths
val v1 = 4000000028222916L
然后我用我的图和随机顶点 ID 调用他们的函数。以前我有 v1
不被识别为 long
类型的问题,L
后缀解决了这个问题。
val results = dijkstra(my_graph, 1L).vertices.map(_._2).collect
println(results)
但是,这 returns 以下内容:
Error: Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
at GraphX$.dijkstra(GraphX.scala:51)
at GraphX$.main(GraphX.scala:85)
at GraphX.main(GraphX.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
第51行指的是var g2 = g.mapVertices(
行
第85行指的是val results = dijkstra(my_graph, 1L).vertices.map(_._2).collect
行
这个异常指的是什么方法?我可以使用 sbt
打包而不会出错,而且我看不到我正在调用的方法不存在。
错误分析:
请看错误:
Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.ObjectRef.create
您的程序正在寻找 create()
方法,但没有找到。
根本原因分析:
实际上,当 scala 和 spark 之间的版本不匹配时,就会发生这种类型的错误。
或者,
编译由一个版本完成,在运行时它使用另一个 scala 版本。
Scala 版本差异:
如果你检查,Scala 版本 2.10.x
https://github.com/scala/scala/blob/2.10.x/src/library/scala/runtime/ObjectRef.java
你会得到,ObjectRef
class有
public T elem;
public ObjectRef(T elem) { this.elem = elem; }
public String toString() { return String.valueOf(elem); }
但是如果你检查一下,Scala version 2.11.x
https://github.com/scala/scala/blob/2.11.x/src/library/scala/runtime/ObjectRef.java
你会得到,ObjectRef
class有
public T elem;
public ObjectRef(T elem) { this.elem = elem; }
@Override
public String toString() { return String.valueOf(elem); }
public static <U> ObjectRef<U> create(U e) { return new ObjectRef<U>(e); } // Your program wants to get this method, but not getting this.
public static ObjectRef<Object> zero() { return new ObjectRef<Object>(null); }
create()
方法在这里可用。
解决方案#1:
所以请确认,编译和运行时它们是从相同版本的 Scala 获得的。
解决方案#2:
请检查:
这里,Robert Horvick在评论里也这么说了
Spark 1.6 is based on Scala 2.10 not 2.11
所以你可以从这里更新 Spark 版本 http://spark.apache.org/downloads.html
最新稳定版本 是 Apache Spark 2.0.1,2016 年 10 月 3 日发布
问题不是版本错误,也不是缺少实现,而是编译器的误导性错误。
好的,事情是这样的:调查代码后,我注意到以下部分包含一个额外的右括号:
val currentVertexId: VertexId = g2.vertices.filter(!_._2._1)
.fold((0L, (false, Double.MaxValue, List[VertexId]())))(
(a, b) => if (a._2._2 < b._2._2) a else b))._1
^
|
您只需删除多余的括号即可完美运行。这是完整的代码:
// scala> :pa
// Entering paste mode (ctrl-D to finish)
import org.apache.spark.graphx._
def dijkstra[VD](g: Graph[VD, Double], origin: VertexId) = {
var g2 = g.mapVertices(
(vid, vd) => (false, if (vid == origin) 0 else Double.MaxValue, List[VertexId]())
)
for (i <- 1L to g.vertices.count - 1) {
val currentVertexId: VertexId = g2.vertices.filter(!_._2._1)
.fold((0L, (false, Double.MaxValue, List[VertexId]())))(
(a, b) => if (a._2._2 < b._2._2) a else b)._1
val newDistances: VertexRDD[(Double, List[VertexId])] =
g2.aggregateMessages[(Double, List[VertexId])](
ctx => if (ctx.srcId == currentVertexId) {
ctx.sendToDst((ctx.srcAttr._2 + ctx.attr, ctx.srcAttr._3 :+ ctx.srcId))
},
(a, b) => if (a._1 < b._1) a else b
)
g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) => {
val newSumVal = newSum.getOrElse((Double.MaxValue, List[VertexId]()))
(
vd._1 || vid == currentVertexId,
math.min(vd._2, newSumVal._1),
if (vd._2 < newSumVal._1) vd._3 else newSumVal._2
)
})
}
g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
(vd, dist.getOrElse((false, Double.MaxValue, List[VertexId]()))
.productIterator.toList.tail
))
}
// Path Finding - random node from which to find all paths
现在,让我们测试一下:
val myVertices: RDD[(VertexId, String)] = sc.makeRDD(Array((1L, "A"), (2L, "B"), (3L, "C"), (4L, "D"), (5L, "E"), (6L, "F"), (7L, "G")))
val myEdges: RDD[Edge[Double]] = sc.makeRDD(Array(Edge(1L, 2L, 7.0), Edge(1L, 4L, 5.0), Edge(2L, 3L, 8.0), Edge(2L, 4L, 9.0), Edge(2L, 5L, 7.0), Edge(3L, 5L, 5.0), Edge(4L, 5L, 15.0), Edge(4L, 6L, 6.0),Edge(5L, 6L, 8.0), Edge(5L, 7L, 9.0), Edge(6L, 7L, 11.0)))
val my_graph = Graph(myVertices, myEdges).cache()
val v1 = 4000000028222916L
val results = dijkstra(my_graph, 1L).vertices.map(_._2).collect
// [CTRL-D]
// Exiting paste mode, now interpreting.
// [Lscala.Tuple2;@668a0785
// import org.apache.spark.graphx._
// myVertices: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, String)] = ParallelCollectionRDD[556] at makeRDD at <console>:37
// myEdges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Double]] = ParallelCollectionRDD[557] at makeRDD at <console>:39
// my_graph: org.apache.spark.graphx.Graph[String,Double] = org.apache.spark.graphx.impl.GraphImpl@49ea0d90
// dijkstra: [VD](g: org.apache.spark.graphx.Graph[VD,Double], origin: org.apache.spark.graphx.VertexId)org.apache.spark.graphx.Graph[(VD, List[Any]),Double]
// v1: Long = 4000000028222916
// results: Array[(String, List[Any])] = Array((A,List(0.0, List())), (B,List(7.0, List(1))), (C,List(15.0, Li...
scala> results.foreach(println)
// (A,List(0.0, List()))
// (B,List(7.0, List(1)))
// (C,List(15.0, List(1, 2)))
// (D,List(5.0, List(1)))
// (E,List(14.0, List(1, 2)))
// (F,List(11.0, List(1, 4)))
// (G,List(22.0, List(1, 4, 6)))
编辑 - 我发现这本书是为 scala 1.6
而写的,但其余部分是 2.11
.
我正在尝试实施 Michael Malak 和 Robin East 的 Spark GraphX in Action
书中的加权最短路径算法。有问题的部分是第 6 章 here.
我有自己的图表,是从两个 RDD 创建的。有 344436
个顶点和 772983
个边。我可以使用本机 GraphX 库执行未加权的最短路径计算,并且我对图形构建充满信心。
在这种情况下,我使用他们的 Dijkstra 实现如下:
val my_graph: Graph[(Long),Double] = Graph.apply(verticesRDD, edgesRDD).cache()
def dijkstra[VD](g:Graph[VD,Double], origin:VertexId) = {
var g2 = g.mapVertices(
(vid,vd) => (false, if (vid == origin) 0 else Double.MaxValue, List[VertexId]())
)
for (i <- 1L to g.vertices.count-1) {
val currentVertexId = g2.vertices
.filter(!_._2._1)
.fold((0L, (false, Double.MaxValue, List[VertexId]())))(
(a,b) => if (a._2._2 < b._2._2) a else b)
)
._1
val newDistances = g2.aggregateMessages[(Double, List[VertexId])](
ctx => if (ctx.srcId == currentVertexId) {
ctx.sendToDst((ctx.srcAttr._2 + ctx.attr, ctx.srcAttr._3 :+ ctx.srcId))
},
(a,b) => if (a._1 < b._1) a else b
)
g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) => {
val newSumVal = newSum.getOrElse((Double.MaxValue,List[VertexId]()))
(
vd._1 || vid == currentVertexId,
math.min(vd._2, newSumVal._1),
if (vd._2 < newSumVal._1) vd._3 else newSumVal._2
)
})
}
g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
(vd, dist.getOrElse((false,Double.MaxValue,List[VertexId]()))
.productIterator.toList.tail
))
}
// Path Finding - random node from which to find all paths
val v1 = 4000000028222916L
然后我用我的图和随机顶点 ID 调用他们的函数。以前我有 v1
不被识别为 long
类型的问题,L
后缀解决了这个问题。
val results = dijkstra(my_graph, 1L).vertices.map(_._2).collect
println(results)
但是,这 returns 以下内容:
Error: Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
at GraphX$.dijkstra(GraphX.scala:51)
at GraphX$.main(GraphX.scala:85)
at GraphX.main(GraphX.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
第51行指的是var g2 = g.mapVertices(
行
第85行指的是val results = dijkstra(my_graph, 1L).vertices.map(_._2).collect
这个异常指的是什么方法?我可以使用 sbt
打包而不会出错,而且我看不到我正在调用的方法不存在。
错误分析:
请看错误:
Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.ObjectRef.create
您的程序正在寻找 create()
方法,但没有找到。
根本原因分析:
实际上,当 scala 和 spark 之间的版本不匹配时,就会发生这种类型的错误。
或者,
编译由一个版本完成,在运行时它使用另一个 scala 版本。
Scala 版本差异:
如果你检查,Scala 版本 2.10.x
https://github.com/scala/scala/blob/2.10.x/src/library/scala/runtime/ObjectRef.java
你会得到,ObjectRef
class有
public T elem;
public ObjectRef(T elem) { this.elem = elem; }
public String toString() { return String.valueOf(elem); }
但是如果你检查一下,Scala version 2.11.x https://github.com/scala/scala/blob/2.11.x/src/library/scala/runtime/ObjectRef.java
你会得到,ObjectRef
class有
public T elem;
public ObjectRef(T elem) { this.elem = elem; }
@Override
public String toString() { return String.valueOf(elem); }
public static <U> ObjectRef<U> create(U e) { return new ObjectRef<U>(e); } // Your program wants to get this method, but not getting this.
public static ObjectRef<Object> zero() { return new ObjectRef<Object>(null); }
create()
方法在这里可用。
解决方案#1:
所以请确认,编译和运行时它们是从相同版本的 Scala 获得的。
解决方案#2:
请检查:
这里,Robert Horvick在评论里也这么说了
Spark 1.6 is based on Scala 2.10 not 2.11
所以你可以从这里更新 Spark 版本 http://spark.apache.org/downloads.html
最新稳定版本 是 Apache Spark 2.0.1,2016 年 10 月 3 日发布
问题不是版本错误,也不是缺少实现,而是编译器的误导性错误。
好的,事情是这样的:调查代码后,我注意到以下部分包含一个额外的右括号:
val currentVertexId: VertexId = g2.vertices.filter(!_._2._1)
.fold((0L, (false, Double.MaxValue, List[VertexId]())))(
(a, b) => if (a._2._2 < b._2._2) a else b))._1
^
|
您只需删除多余的括号即可完美运行。这是完整的代码:
// scala> :pa
// Entering paste mode (ctrl-D to finish)
import org.apache.spark.graphx._
def dijkstra[VD](g: Graph[VD, Double], origin: VertexId) = {
var g2 = g.mapVertices(
(vid, vd) => (false, if (vid == origin) 0 else Double.MaxValue, List[VertexId]())
)
for (i <- 1L to g.vertices.count - 1) {
val currentVertexId: VertexId = g2.vertices.filter(!_._2._1)
.fold((0L, (false, Double.MaxValue, List[VertexId]())))(
(a, b) => if (a._2._2 < b._2._2) a else b)._1
val newDistances: VertexRDD[(Double, List[VertexId])] =
g2.aggregateMessages[(Double, List[VertexId])](
ctx => if (ctx.srcId == currentVertexId) {
ctx.sendToDst((ctx.srcAttr._2 + ctx.attr, ctx.srcAttr._3 :+ ctx.srcId))
},
(a, b) => if (a._1 < b._1) a else b
)
g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) => {
val newSumVal = newSum.getOrElse((Double.MaxValue, List[VertexId]()))
(
vd._1 || vid == currentVertexId,
math.min(vd._2, newSumVal._1),
if (vd._2 < newSumVal._1) vd._3 else newSumVal._2
)
})
}
g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
(vd, dist.getOrElse((false, Double.MaxValue, List[VertexId]()))
.productIterator.toList.tail
))
}
// Path Finding - random node from which to find all paths
现在,让我们测试一下:
val myVertices: RDD[(VertexId, String)] = sc.makeRDD(Array((1L, "A"), (2L, "B"), (3L, "C"), (4L, "D"), (5L, "E"), (6L, "F"), (7L, "G")))
val myEdges: RDD[Edge[Double]] = sc.makeRDD(Array(Edge(1L, 2L, 7.0), Edge(1L, 4L, 5.0), Edge(2L, 3L, 8.0), Edge(2L, 4L, 9.0), Edge(2L, 5L, 7.0), Edge(3L, 5L, 5.0), Edge(4L, 5L, 15.0), Edge(4L, 6L, 6.0),Edge(5L, 6L, 8.0), Edge(5L, 7L, 9.0), Edge(6L, 7L, 11.0)))
val my_graph = Graph(myVertices, myEdges).cache()
val v1 = 4000000028222916L
val results = dijkstra(my_graph, 1L).vertices.map(_._2).collect
// [CTRL-D]
// Exiting paste mode, now interpreting.
// [Lscala.Tuple2;@668a0785
// import org.apache.spark.graphx._
// myVertices: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, String)] = ParallelCollectionRDD[556] at makeRDD at <console>:37
// myEdges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Double]] = ParallelCollectionRDD[557] at makeRDD at <console>:39
// my_graph: org.apache.spark.graphx.Graph[String,Double] = org.apache.spark.graphx.impl.GraphImpl@49ea0d90
// dijkstra: [VD](g: org.apache.spark.graphx.Graph[VD,Double], origin: org.apache.spark.graphx.VertexId)org.apache.spark.graphx.Graph[(VD, List[Any]),Double]
// v1: Long = 4000000028222916
// results: Array[(String, List[Any])] = Array((A,List(0.0, List())), (B,List(7.0, List(1))), (C,List(15.0, Li...
scala> results.foreach(println)
// (A,List(0.0, List()))
// (B,List(7.0, List(1)))
// (C,List(15.0, List(1, 2)))
// (D,List(5.0, List(1)))
// (E,List(14.0, List(1, 2)))
// (F,List(11.0, List(1, 4)))
// (G,List(22.0, List(1, 4, 6)))