Spark 新手,使用 graphx 图进行映射 - NullPointerException

New to Spark, mapping with graphx graphs - NullPointerException

我的目标是从一个普通的完整图中计算多个子图中的三角形。子图由一组常量节点 + 来自 RDD[Long] 的节点定义。我是 spark/graphx 的新手,所以这可能是地图使用不当。 我分享的代码会重现我的错误。

首先,我有一个完整图的子图声明如下

import org.apache.spark.rdd._
import org.apache.spark.graphx._
val nodes: RDD[(VertexId, String)] = sc.parallelize(Array((3L, "3"), (7L, "7"), (5L, "5"), (2L, "2"),(4L,"4")))
val vertices: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "a"), Edge(3L, 5L, "b"), Edge(2L, 5L, "c"), Edge(5L, 7L, "d"), Edge(2L, 7L, "e"),Edge(4L,5L,"f")))
val graph: Graph[String,String] = Graph(nodes, vertices, "z")

val baseNodes: Array[Long] = Array(2L,5L,7L)    
val subgraph = graph.subgraph(vpred = (vid,attr)=> baseNodes contains vid)

然后我从图中声明其他节点的 RDD[Long]。

val testNodes: RDD[Long] = sc.parallelize(Array(3L,4L))

我想将每个测试节点添加到子图中并计算测试节点上存在的三角形。

val triangles: RDD[(Long,Int)] = testNodes.map{ newNode =>
  val newNodes: Array[Long] = baseNodes :+ newNode
  val newSubgraph = graph.subgraph(vpred = (vid,attr)=> newNodes contains vid)
  (newNode,findTriangles(7L,newSubgraph))
}
triangles.foreach(x=>x.toString)

如果我在地图函数之外调用它,我的 findTriangles 工作正常。

def findTriangles(id:Long,subgraph:Graph[String,String]): Int = {
  val triCounts = subgraph.triangleCount().vertices
  val count:Int = triCounts.filter{case(item,count)=> {item.toInt == id}}.map{case(item,count)=>count}.first
  count
}
val triangles = findTriangles(7L,subgraph) //1

但是当我 运行 我的 map 函数计算三角形时,我得到一个 NullPointerException。我认为问题在于在映射函数中使用我的图形 val。那是问题吗?有没有办法解决这个问题?

我认为问题应该出在 baseNodes 变量上。在本地声明的变量(例如示例中的 baseNodes)仅在 Spark 驱动程序中可见,在实际执行转换和操作的执行程序中不可见。为避免 NullPointerException,您需要并行化在执行程序上执行的转换(如映射)中需要的任何变量。作为替代方案,如果您拥有的变量是只读的,则可以使用 Spark 中的广播结构将该变量广播给执行程序。在你的例子中,baseNodes 似乎没有在映射操作中被修改,所以它是一个很好的广播而不是并行化的候选者。