在 GraphX 中计算图形直径的正确方法是什么

What is the proper way to compute graph diameter in GraphX

我正在 GraphX 上实现一个算法,为此我还需要计算一些相对较小的图形的直径。 问题是 GraphX 没有任何无向图的概念,所以当使用 ShortestPaths 的内置方法时,它显然会得到短有向路径。这对计算图形直径没有太大帮助(任何节点对之间的最长最短无向路径)。

我想复制图形的边(而不是 |E| 我会有 2|E| 边)但我觉得这不是正确的方法。那么,有没有更好的方法在 GraphX 上做到这一点?

这是我的有向图代码:

// computing the query diameter
def getDiameter(graph: Graph[String, Int]):Long = {
    // Get ids of vertices of the graph 
    val vIds = graph.vertices.collect.toList.map(_._1) 
    // Compute list of shortest paths for every vertex in the graph
    val shortestPaths  = lib.ShortestPaths.run(graph, vIds).vertices.collect
    // extract only the distance values from a list of tuples <VertexId, Map> where map contains <key, value>: <dst vertex, shortest directed distance>
    val values = shortestPaths.map(element => element._2).map(element => element.values)

    // diamter is the longest shortest undirected distance between any pair of nodes in te graph
    val diameter  = values.map(m => m.max).max
    diameter 
}

GraphX 实际上没有方向的概念,你不告诉它使用它。 如果您查看 ShortestPaths 库的内部工作原理,您会发现它使用 Pregel 并且方向是默认的 (EdgeDirection.Either)。这意味着对于所有三元组,它会将源和目标都添加到活动集中。 但是,如果您在 PregelsendMsg 函数中指定仅将 srcId 保留在活动集中(就像在 ShortestPaths 库中发生的那样),某些顶点(只有出边)将不会重新评估。

无论如何,解决方案是编写自己的 Diameter object/library,可能看起来像这样(主要基于 ShortestPath,所以也许有更好的解决方案?)

object Diameter extends Serializable {
  type SPMap = Map[VertexId, Int]

  def makeMap(x: (VertexId, Int)*) = Map(x: _*)

  def incrementMap(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) }

  def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = {
    (spmap1.keySet ++ spmap2.keySet).map {
      k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue))
    }(collection.breakOut) // more efficient alternative to [[collection.Traversable.toMap]]
  }

  // Removed landmarks, since all paths have to be taken in consideration
  def run[VD, ED: ClassTag](graph: Graph[VD, ED]): Int = {
    val spGraph = graph.mapVertices { (vid, _) => makeMap(vid -> 0)  }

    val initialMessage:SPMap = makeMap()

    def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = {
      addMaps(attr, msg)
    }

    def sendMessage(edge: EdgeTriplet[SPMap, _]): Iterator[(VertexId, SPMap)] = {
      // added the concept of updating the dstMap based on the srcMap + 1
      val newSrcAttr = incrementMap(edge.dstAttr)
      val newDstAttr = incrementMap(edge.srcAttr)

      List(
       if (edge.srcAttr != addMaps(newSrcAttr, edge.srcAttr)) Some((edge.srcId, newSrcAttr)) else None,
       if (edge.dstAttr != addMaps(newDstAttr, edge.dstAttr)) Some((edge.dstId, newDstAttr)) else None
      ).flatten.toIterator
    }

    val pregel = Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps)

    // each vertex will contain map with all shortest paths, so just get first
    pregel.vertices.first()._2.values.max
  }
}

val diameter = Diameter.run(graph)