删除 GraphX 中没有出边的顶点

Remove Vertices with no outgoing edges in GraphX

我有一个大图(几百万个顶点和边)。我想删除所有没有出边的顶点(和边)。我有一些有效的代码,但速度很慢,我需要多次执行。我确信我可以使用一些现有的 GraphX 方法来使其更快。

这是我的代码。

val users: RDD[(VertexId, String)] = sc.parallelize(Array((1L, "1"), (2L, "2"), (3L, "3"), (4L, "4")))
  val relationships: RDD[Edge[Double]] = sc.parallelize(
    Array(
      Edge(1L, 3L, 500.0),
      Edge(3L, 2L, 400.0),
      Edge(2L, 1L, 600.0),
      Edge(3L, 1L, 200.0),
      Edge(2L, 4L, 200.0),
      Edge(3L, 4L, 500.0)
    ))

val graph = org.apache.spark.graphx.Graph(users, relationships)

val lst = graph.outDegrees.map(x => x._1).collect
var set:scala.collection.mutable.HashSet[Long] = new scala.collection.mutable.HashSet()
for(a<- lst) {set.add(a)}
var subg = graph.subgraph(vpred = (id, attr) => set.contains(id))
//since vertex 4 has no outgoing edges, subg.edges should return 4 and subg.vertices = 3 

我不知道还有什么方法可以实现。感谢您的帮助!

编辑:我可以用 HashSet 做到这一点,但我认为它仍然可以改进。

对您的代码的第一个优化是让 lst 成为一个集合而不是一个数组,这将使查找 O(1) 而不是 O(n)

但这不是可扩展的,因为您正在收集驱动程序上的所有内容,然后将其发送回执行程序。正确的方法是用 outDegrees 调用 joinVertices 并映射到原始图。

您可以直接用过滤后的顶点定义另一个图。 像这样:

val lst = graph.outDegrees.map(x => x._1).collect
var graph2 = Graph(graph.vertices.filter(v => lst.contains(v)), graph.edges)

如果您不想使用子图,这里是另一种使用三元组来查找那些也是源顶点的目标顶点的方法。

val graph = org.apache.spark.graphx.Graph(users, relationships)
val AsSubjects = graph.triplets.map(triplet => (triplet.srcId,(triplet)))
val AsObjects = graph.triplets.map(triplet => (triplet.dstId,(triplet)))
val ObjectsJoinSubjects = AsObjects.join(AsSubjects)
val ObjectsJoinSubjectsDistinct = ObjectsJoinSubjects.mapValues(x => x._1).distinct()
val NewVertices = ObjectsJoinSubjectsDistinct.map(x => (x._2.srcId, x._2.srcAttr)).distinct()
val NewEdges = ObjectsJoinSubjectsDistinct.map(x => new Edge(x._2.srcId, x._2.dstId, x._2.attr))
val newgraph = Graph(NewVertices,NewEdges)

我不确定这是否提供了对子图的改进,因为我的解决方案使用了昂贵的 distinct()。我用你提供的图表进行了测试,我的解决方案实际上需要更长的时间。但是,我觉得这是一个小例子。因此,我建议您使用更大的图表进行测试,并告诉我们这样做是否更好。

你能不能找到所有零出度顶点。

val zeroOutDeg = graph.filter(graph => {
   val degrees: VertexRDD[Int] = graph.outDegrees
   graph.outerJoinVertices(degrees) {(vid, data, deg => deg.getOrElse(0)}
   }, vpred = (vid: VertexId, deg:Int) => deg == 0)