scala.MatchError 在一个元组上

scala.MatchError on a tuple

在处理了一些输入数据后,我得到了一个 RDD[(String, String, Long)],比如 input,在手。

input: org.apache.spark.rdd.RDD[(String, String, Long)] = MapPartitionsRDD[9] at flatMap at <console>:54

这里的字符串字段代表图的顶点,长字段是边的权重。

要由此创建图表,如果顶点未知,我首先将顶点插入具有唯一 ID 的地图中。如果已经遇到,我会使用之前分配的顶点 ID。本质上,每个顶点都被分配了一个 Long 类型的唯一 ID,然后我想创建边。

这是我正在做的事情:

var vertexMap = collection.mutable.Map[String, Long]()
var vid : Long = 0          // global vertex id counter
var srcVid : Long = 0       // source vertex id
var dstVid : Long = 0       // destination vertex id

val graphEdges = input.map {
    case Row(src: String, dst: String, weight: Long) => (
        if (vertexMap.contains(src)) {
            srcVid = vertexMap(src)
            if (vertexMap.contains(dst)) {
                dstVid = vertexMap(dst)
            } else {
                vid += 1   // pick a new vertex id
                vertexMap += (dst -> vid)
                dstVid = vid
            }
            Edge(srcVid, dstVid, weight)
        } else {
            vid += 1
            vertexMap(src) = vid
            srcVid = vid
            if (vertexMap.contains(dst)) {
                dstVid = vertexMap(dst)
            } else {
                vid += 1
                vertexMap(dst) = vid
                dstVid = vid
            }
            Edge(srcVid, dstVid, weight)
        }
    }

val graph = Graph.fromEdges(graphEdges, 0)
println("num edges = " + graph.numEdges);
println("num vertices = " + graph.numVertices);

我看到的是

graphEdges 的类型为 RDD[org.apache.spark.graphx.Edge[Long]],图的类型为 Graph[Int,Long]

graphEdges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Long]] = MapPartitionsRDD[10] at map at <console>:64
graph: org.apache.spark.graphx.Graph[Int,Long] = org.apache.spark.graphx.impl.GraphImpl@1b48170a

但在打印图形的边和顶点数时出现以下错误。

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 8.0 failed 1 times, most recent failure: Lost task 1.0 in stage 8.0 (TID 9, localhost, executor driver): ***scala.MatchError: (vertexA, vertexN, 2000
)*** (of class scala.Tuple3)
        at $anonfun.apply(<console>:64)
        at $anonfun.apply(<console>:64)
        at scala.collection.Iterator$$anon.next(Iterator.scala:409)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at org.apache.spark.graphx.EdgeRDD$$anonfun.apply(EdgeRDD.scala:107)
        at org.apache.spark.graphx.EdgeRDD$$anonfun.apply(EdgeRDD.scala:105)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$$anonfun$apply.apply(RDD.scala:844)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$$anonfun$apply.apply(RDD.scala:844)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:336)
        at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:334)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:1038)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:1029)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

我不明白这里哪里不匹配。

感谢@Joe K 提供的有用提示。我开始使用 zipIndex 并且代码现在看起来很紧凑,但是图形实例化仍然失败。这是更新后的代码:

val vertices = input.map(r => r._1).union(input.map(r => r._2)).distinct.zipWithIndex
val graphEdges = input.map {
    case (src, dst, weight) =>
        Edge(vertices.lookup(src)(0), vertices.lookup(dst)(0), weight)
}
val graph = Graph.fromEdges(graphEdges, 0)
println("num edges = " + graph.numEdges);

因此,从最初的 3 元组开始,我形成了第 1 和第 2 元组(它们是顶点)的并集,然后在统一化它们之后为每个元组分配唯一的 ID。然后我在创建边缘时使用他们的 ID。但是,它失败并出现以下异常:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 23, localhost, executor driver): org.apache.spark.SparkException: This RDD lacks
 a SparkContext. It could happen in the following cases:
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed
inside of the rdd1.map transformation. For more information, see SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.
        at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:89)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:937)
        at $anonfun.apply(<console>:55)
        at $anonfun.apply(<console>:53)
        at scala.collection.Iterator$$anon.next(Iterator.scala:409)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at org.apache.spark.graphx.EdgeRDD$$anonfun.apply(EdgeRDD.scala:107)

有什么想法吗?

此特定错误是由于尝试将元组匹配为 Row,但事实并非如此。

变化:

case Row(src: String, dst: String, weight: Long) => {

只是:

case (src, dst, weight) => {

此外,您生成顶点 ID 的更大计划将行不通。 map 中的所有逻辑将在不同的执行器中并行发生,这些执行器将具有可变映射的不同副本。

您应该 flatMap 您的边以获得所有顶点的列表,然后调用 .distinct.zipWithIndex 为每个顶点分配一个唯一的 long 值。然后你需要重新加入原来的边缘。