GraphX VertexRDD NullPointerException

GraphX VertexRDD NullPointerException

我正在尝试在图形上传递一些消息来计算递归特征。 当我定义一个顶点是 aggregateMessages 的输出的图时出现错误。上下文代码

> val newGraph = Graph(newVertices, edges)

newGraph: org.apache.spark.graphx.Graph[List[Double],Int] = org.apache.spark.graphx.impl.GraphImpl@2091594b

//This is the RDD that causes the problem
> val result = newGraph.aggregateMessages[List[Double]](
  {triplet => triplet.sendToDst(triplet.srcAttr)},
  {(a,b) => a.zip(b).map { case (x, y) => x + y }},
  {TripletFields.Src}) 

result: org.apache.spark.graphx.VertexRDD[List[Double]] = VertexRDDImpl[1990] at RDD at VertexRDD.scala:57

> result.take(1) 
res121: Array[(org.apache.spark.graphx.VertexId, List[Double])] = Array((1944425548,List(0.0, 0.0, 137.0, 292793.0)))

到目前为止没问题,但当我尝试时

> val newGraph2 = Graph(result, edges)

newGraph2: org.apache.spark.graphx.Graph[List[Double],Int] = org.apache.spark.graphx.impl.GraphImpl@710919e1

> val result2 = newGraph2.aggregateMessages[List[Double]](
  {triplet => triplet.sendToDst(triplet.srcAttr)},
  {(a,b) => a.zip(b).map { case (x, y) => x + y }},
  {TripletFields.Src})

> result2.count

我收到以下(已修整)错误:

result2: org.apache.spark.graphx.VertexRDD[List[Double]] = VertexRDDImpl[2009] at RDD at VertexRDD.scala:57
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4839.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4839.0 (TID 735, 10.0.2.15): java.lang.NullPointerException
    at $anonfun.apply(<console>:62)
    at $anonfun.apply(<console>:62)
    at org.apache.spark.graphx.impl.AggregatingEdgeContext.send(EdgePartition.scala:536)
    at org.apache.spark.graphx.impl.AggregatingEdgeContext.sendToDst(EdgePartition.scala:531)
    at $anonfun.apply(<console>:61)
    at $anonfun.apply(<console>:61)
    at org.apache.spark.graphx.impl.EdgePartition.aggregateMessagesEdgeScan(EdgePartition.scala:409)
    at org.apache.spark.graphx.impl.GraphImpl$$anonfun$$anonfun$apply.apply(GraphImpl.scala:237)
    at org.apache.spark.graphx.impl.GraphImpl$$anonfun$$anonfun$apply.apply(GraphImpl.scala:207)
    at scala.collection.Iterator$$anon.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:440)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
...
Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1438)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1437)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
...
Caused by: java.lang.NullPointerException
  at $anonfun.apply(<console>:62)
  at $anonfun.apply(<console>:62)
  at org.apache.spark.graphx.impl.AggregatingEdgeContext.send(EdgePartition.scala:536)
  at org.apache.spark.graphx.impl.AggregatingEdgeContext.sendToDst(EdgePartition.scala:531)
  at $anonfun.apply(<console>:61)
  at $anonfun.apply(<console>:61)
  at org.apache.spark.graphx.impl.EdgePartition.aggregateMessagesEdgeScan(EdgePartition.scala:409)
  at org.apache.spark.graphx.impl.GraphImpl$$anonfun$$anonfun$apply.apply(GraphImpl.scala:237)
  at org.apache.spark.graphx.impl.GraphImpl$$anonfun$$anonfun$apply.apply(GraphImpl.scala:207)
  at scala.collection.Iterator$$anon.nextCur(Iterator.scala:434)
  at scala.collection.Iterator$$anon.hasNext(Iterator.scala:440)
  at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
  at org.apache.spark.scheduler.Task.run(Task.scala:85)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
  ... 3 more

我认为这不是类型不匹配错误,因为 aggregateMessages returns 和 VertexRDD,知道为什么我会遇到这个问题吗?

并非图中的所有节点都由 aggregateMessages 返回,只有那些收到消息的节点。 NullPointerException 是由图形中指向这些节点的边加上图形定义中缺少默认节点值引起的。