Spark GraphX:要求失败:初始容量无效

Spark GraphX : requirement failed: Invalid initial capacity

我是 Spark、Scala 的新手。

我正在尝试在此数据集中执行三角形计数:DataSet
对于业余爱好项目

这是我到目前为止编写的代码:

   import org.apache.spark.SparkConf
   import org.apache.spark.SparkContext
   import org.apache.spark.graphx.Edge
   import org.apache.spark.graphx.Graph
   import org.apache.spark.graphx.Graph.graphToGraphOps
   import org.apache.spark.graphx.PartitionStrategy
   import org.apache.spark.rdd.RDD
   import org.apache.spark.rdd.RDD.rddToPairRDDFunctions

   object GraphXApps {
       def main(args: Array[String]): Unit = {
               val conf = new SparkConf()
               .setAppName("GraphXApps")
               .setSparkHome(System.getenv("SPARK_HOME"))
               .setJars(SparkContext.jarOfClass(this.getClass).toList)

               val sc = new SparkContext(conf)

               // Load the edges in canonical order and partition the graph for triangle count
               val edges: RDD[Edge[String]] =
               sc.textFile(args(0)).map { line =>
               val fields = line.split("\t")
               Edge(fields(0).toLong, fields(1).toLong)
               }

               val graph : Graph[String, String] = Graph.fromEdges(edges.sortBy(_.srcId, ascending = true, 1), "defaultProperty").partitionBy(PartitionStrategy.RandomVertexCut)
                       // Find the triangle count for each vertex
                       val triCounts = graph.triangleCount().vertices
                       val triCountById = graph.vertices.join(triCounts).map(_._2._2)   
                       // Print the result
                       println(triCountById.collect().mkString("\n"))

                       sc.stop()

       }
   }

但我收到此错误:java.lang.IllegalArgumentException:要求失败:初始容量无效

请让我知道哪里出错了。这真的很有帮助。

完整堆栈跟踪

16/10/31 01:03:08 ERROR TaskSetManager: Task 0 in stage 8.0 failed 1 times; aborting job
16/10/31 01:03:08 INFO TaskSchedulerImpl: Removed TaskSet 8.0, whose tasks have all completed, from pool 
16/10/31 01:03:08 INFO TaskSchedulerImpl: Cancelling stage 8
16/10/31 01:03:08 INFO DAGScheduler: ShuffleMapStage 8 (mapPartitions at VertexRDDImpl.scala:245) failed in 0.131 s
16/10/31 01:03:08 INFO DAGScheduler: Job 0 failed: collect at GraphXApps.scala:47, took 3.128921 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 (TID 8, localhost): java.lang.IllegalArgumentException: requirement failed: Invalid initial capacity
    at scala.Predef$.require(Predef.scala:224)
    at org.apache.spark.util.collection.OpenHashSet$mcJ$sp.<init>(OpenHashSet.scala:51)
    at org.apache.spark.util.collection.OpenHashSet$mcJ$sp.<init>(OpenHashSet.scala:57)
    at org.apache.spark.graphx.lib.TriangleCount$$anonfun.apply(TriangleCount.scala:70)
    at org.apache.spark.graphx.lib.TriangleCount$$anonfun.apply(TriangleCount.scala:69)
    at org.apache.spark.graphx.impl.VertexPartitionBaseOps.map(VertexPartitionBaseOps.scala:61)
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$mapValues.apply(VertexRDDImpl.scala:102)
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$mapValues.apply(VertexRDDImpl.scala:102)
    at scala.collection.Iterator$$anon.next(Iterator.scala:409)
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun.apply(VertexRDDImpl.scala:156)
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun.apply(VertexRDDImpl.scala:154)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:332)
    at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:330)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:919)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:910)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1438)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1437)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:811)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
    at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
    at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:893)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:892)
    at GraphXApps$.main(GraphXApps.scala:47)
    at GraphXApps.main(GraphXApps.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:185)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.IllegalArgumentException: requirement failed: Invalid initial capacity
    at scala.Predef$.require(Predef.scala:224)
    at org.apache.spark.util.collection.OpenHashSet$mcJ$sp.<init>(OpenHashSet.scala:51)
    at org.apache.spark.util.collection.OpenHashSet$mcJ$sp.<init>(OpenHashSet.scala:57)
    at org.apache.spark.graphx.lib.TriangleCount$$anonfun.apply(TriangleCount.scala:70)
    at org.apache.spark.graphx.lib.TriangleCount$$anonfun.apply(TriangleCount.scala:69)
    at org.apache.spark.graphx.impl.VertexPartitionBaseOps.map(VertexPartitionBaseOps.scala:61)
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$mapValues.apply(VertexRDDImpl.scala:102)
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$mapValues.apply(VertexRDDImpl.scala:102)
    at scala.collection.Iterator$$anon.next(Iterator.scala:409)
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun.apply(VertexRDDImpl.scala:156)
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun.apply(VertexRDDImpl.scala:154)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:332)
    at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:330)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:919)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:910)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

这似乎是 Spark 2.0 中的一个错误(到目前为止,已针对 2.0、2.0.1 和 2.0.2 对此进行了测试)。 Jira [SPARK-18200]: GraphX Invalid initial capacity when running triangleCount 的创建就是为了解决这个问题。

如链接 notebook 中所述,您的代码应该可以与 Spark 1.6 一起工作。

但是正如您所指出的,它在 Spark 2.0 中失败了,如链接中所述 notebook

在此期间,请尝试使用 Spark 1.6 或尝试使用 GraphFrames,如链接 notebook 中所述。

HTH!