scala.MatchError 在一个元组上
scala.MatchError on a tuple
在处理了一些输入数据后,我得到了一个 RDD[(String, String, Long)],比如 input,在手。
input: org.apache.spark.rdd.RDD[(String, String, Long)] = MapPartitionsRDD[9] at flatMap at <console>:54
这里的字符串字段代表图的顶点,长字段是边的权重。
要由此创建图表,如果顶点未知,我首先将顶点插入具有唯一 ID 的地图中。如果已经遇到,我会使用之前分配的顶点 ID。本质上,每个顶点都被分配了一个 Long 类型的唯一 ID,然后我想创建边。
这是我正在做的事情:
var vertexMap = collection.mutable.Map[String, Long]()
var vid : Long = 0 // global vertex id counter
var srcVid : Long = 0 // source vertex id
var dstVid : Long = 0 // destination vertex id
val graphEdges = input.map {
case Row(src: String, dst: String, weight: Long) => (
if (vertexMap.contains(src)) {
srcVid = vertexMap(src)
if (vertexMap.contains(dst)) {
dstVid = vertexMap(dst)
} else {
vid += 1 // pick a new vertex id
vertexMap += (dst -> vid)
dstVid = vid
}
Edge(srcVid, dstVid, weight)
} else {
vid += 1
vertexMap(src) = vid
srcVid = vid
if (vertexMap.contains(dst)) {
dstVid = vertexMap(dst)
} else {
vid += 1
vertexMap(dst) = vid
dstVid = vid
}
Edge(srcVid, dstVid, weight)
}
}
val graph = Graph.fromEdges(graphEdges, 0)
println("num edges = " + graph.numEdges);
println("num vertices = " + graph.numVertices);
我看到的是
graphEdges 的类型为 RDD[org.apache.spark.graphx.Edge[Long]],图的类型为 Graph[Int,Long]
graphEdges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Long]] = MapPartitionsRDD[10] at map at <console>:64
graph: org.apache.spark.graphx.Graph[Int,Long] = org.apache.spark.graphx.impl.GraphImpl@1b48170a
但在打印图形的边和顶点数时出现以下错误。
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 8.0 failed 1 times, most recent failure: Lost task 1.0 in stage 8.0 (TID 9, localhost, executor driver): ***scala.MatchError: (vertexA, vertexN, 2000
)*** (of class scala.Tuple3)
at $anonfun.apply(<console>:64)
at $anonfun.apply(<console>:64)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.graphx.EdgeRDD$$anonfun.apply(EdgeRDD.scala:107)
at org.apache.spark.graphx.EdgeRDD$$anonfun.apply(EdgeRDD.scala:105)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$$anonfun$apply.apply(RDD.scala:844)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$$anonfun$apply.apply(RDD.scala:844)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:336)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:334)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:1038)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
我不明白这里哪里不匹配。
感谢@Joe K 提供的有用提示。我开始使用 zipIndex 并且代码现在看起来很紧凑,但是图形实例化仍然失败。这是更新后的代码:
val vertices = input.map(r => r._1).union(input.map(r => r._2)).distinct.zipWithIndex
val graphEdges = input.map {
case (src, dst, weight) =>
Edge(vertices.lookup(src)(0), vertices.lookup(dst)(0), weight)
}
val graph = Graph.fromEdges(graphEdges, 0)
println("num edges = " + graph.numEdges);
因此,从最初的 3 元组开始,我形成了第 1 和第 2 元组(它们是顶点)的并集,然后在统一化它们之后为每个元组分配唯一的 ID。然后我在创建边缘时使用他们的 ID。但是,它失败并出现以下异常:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 23, localhost, executor driver): org.apache.spark.SparkException: This RDD lacks
a SparkContext. It could happen in the following cases:
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed
inside of the rdd1.map transformation. For more information, see SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.
at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:89)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:937)
at $anonfun.apply(<console>:55)
at $anonfun.apply(<console>:53)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.graphx.EdgeRDD$$anonfun.apply(EdgeRDD.scala:107)
有什么想法吗?
此特定错误是由于尝试将元组匹配为 Row
,但事实并非如此。
变化:
case Row(src: String, dst: String, weight: Long) => {
只是:
case (src, dst, weight) => {
此外,您生成顶点 ID 的更大计划将行不通。 map
中的所有逻辑将在不同的执行器中并行发生,这些执行器将具有可变映射的不同副本。
您应该 flatMap
您的边以获得所有顶点的列表,然后调用 .distinct.zipWithIndex
为每个顶点分配一个唯一的 long 值。然后你需要重新加入原来的边缘。
在处理了一些输入数据后,我得到了一个 RDD[(String, String, Long)],比如 input,在手。
input: org.apache.spark.rdd.RDD[(String, String, Long)] = MapPartitionsRDD[9] at flatMap at <console>:54
这里的字符串字段代表图的顶点,长字段是边的权重。
要由此创建图表,如果顶点未知,我首先将顶点插入具有唯一 ID 的地图中。如果已经遇到,我会使用之前分配的顶点 ID。本质上,每个顶点都被分配了一个 Long 类型的唯一 ID,然后我想创建边。
这是我正在做的事情:
var vertexMap = collection.mutable.Map[String, Long]()
var vid : Long = 0 // global vertex id counter
var srcVid : Long = 0 // source vertex id
var dstVid : Long = 0 // destination vertex id
val graphEdges = input.map {
case Row(src: String, dst: String, weight: Long) => (
if (vertexMap.contains(src)) {
srcVid = vertexMap(src)
if (vertexMap.contains(dst)) {
dstVid = vertexMap(dst)
} else {
vid += 1 // pick a new vertex id
vertexMap += (dst -> vid)
dstVid = vid
}
Edge(srcVid, dstVid, weight)
} else {
vid += 1
vertexMap(src) = vid
srcVid = vid
if (vertexMap.contains(dst)) {
dstVid = vertexMap(dst)
} else {
vid += 1
vertexMap(dst) = vid
dstVid = vid
}
Edge(srcVid, dstVid, weight)
}
}
val graph = Graph.fromEdges(graphEdges, 0)
println("num edges = " + graph.numEdges);
println("num vertices = " + graph.numVertices);
我看到的是
graphEdges 的类型为 RDD[org.apache.spark.graphx.Edge[Long]],图的类型为 Graph[Int,Long]
graphEdges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Long]] = MapPartitionsRDD[10] at map at <console>:64
graph: org.apache.spark.graphx.Graph[Int,Long] = org.apache.spark.graphx.impl.GraphImpl@1b48170a
但在打印图形的边和顶点数时出现以下错误。
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 8.0 failed 1 times, most recent failure: Lost task 1.0 in stage 8.0 (TID 9, localhost, executor driver): ***scala.MatchError: (vertexA, vertexN, 2000
)*** (of class scala.Tuple3)
at $anonfun.apply(<console>:64)
at $anonfun.apply(<console>:64)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.graphx.EdgeRDD$$anonfun.apply(EdgeRDD.scala:107)
at org.apache.spark.graphx.EdgeRDD$$anonfun.apply(EdgeRDD.scala:105)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$$anonfun$apply.apply(RDD.scala:844)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$$anonfun$apply.apply(RDD.scala:844)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:336)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:334)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:1038)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
我不明白这里哪里不匹配。
感谢@Joe K 提供的有用提示。我开始使用 zipIndex 并且代码现在看起来很紧凑,但是图形实例化仍然失败。这是更新后的代码:
val vertices = input.map(r => r._1).union(input.map(r => r._2)).distinct.zipWithIndex
val graphEdges = input.map {
case (src, dst, weight) =>
Edge(vertices.lookup(src)(0), vertices.lookup(dst)(0), weight)
}
val graph = Graph.fromEdges(graphEdges, 0)
println("num edges = " + graph.numEdges);
因此,从最初的 3 元组开始,我形成了第 1 和第 2 元组(它们是顶点)的并集,然后在统一化它们之后为每个元组分配唯一的 ID。然后我在创建边缘时使用他们的 ID。但是,它失败并出现以下异常:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 23, localhost, executor driver): org.apache.spark.SparkException: This RDD lacks
a SparkContext. It could happen in the following cases:
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed
inside of the rdd1.map transformation. For more information, see SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.
at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:89)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:937)
at $anonfun.apply(<console>:55)
at $anonfun.apply(<console>:53)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.graphx.EdgeRDD$$anonfun.apply(EdgeRDD.scala:107)
有什么想法吗?
此特定错误是由于尝试将元组匹配为 Row
,但事实并非如此。
变化:
case Row(src: String, dst: String, weight: Long) => {
只是:
case (src, dst, weight) => {
此外,您生成顶点 ID 的更大计划将行不通。 map
中的所有逻辑将在不同的执行器中并行发生,这些执行器将具有可变映射的不同副本。
您应该 flatMap
您的边以获得所有顶点的列表,然后调用 .distinct.zipWithIndex
为每个顶点分配一个唯一的 long 值。然后你需要重新加入原来的边缘。