GraphX 顶点之间的笛卡尔积

Cartesian product between vertices of a GraphX

我想在图的节点之间做一个笛卡尔积。我想建立他们的距离矩阵。 也许这不是一个很好的方法,所以,欢迎任何建议。

这是我的代码,它不起作用,我没有任何警告或异常,它就是不起作用。我想也许是因为我正在尝试使用 same RDD 制作笛卡尔积,但我不知道如何修复它,如何制作嵌套循环或其他可以帮我计算​​这个矩阵。

val indexes1 = graph.vertices.map(_._1)
val indexes2 = graph.vertices.map(_._1)

val cartesian = indexes1.cartesian(indexes2).cache()
cartesian.map(pair => matrix.updated(pair._1, shortPathBetween(pair._1, pair._2)))

def shortPathBetween(v1:VertexId, v2:VertexId) : Int = {
    val path = ShortestPaths.run(graph, Seq(v2))
    val shortestPath = path.vertices.filter({case (vId, _ ) => vId == v1})
        .first()
        ._2
        .get(v2)

    shortestPath.getOrElse(-1)
}

我采用的方法是使用预凝胶 API。 这允许从每个节点并行遍历图形。 如果您跟踪距离并在使用边权重遍历时更新它们,您最终会得到与每个(可到达的)其他顶点有距离的顶点。

如果你拿这个有向图举例:

您可以像这样在 Spark GraphX 中初始化它:

val graphData = List(
    (0, 0, 1, 10.0),
    (1, 0, 2, 5.0),
    (2, 1, 2, 2.0),
    (3, 1, 3, 1.0),
    (4, 2, 1, 3.0),
    (5, 2, 3, 9.0),
    (6, 2, 4, 2.0),
    (7, 3, 4, 4.0),
    (8, 4, 0, 7.0),
    (9, 4, 3, 5.0)
  ).toDF("id", "from", "to", "distance")

  val vertexRDD: RDD[(Long, Int)] = graphData.flatMap(_.getValuesMap[Int](List("to", "from")).values).distinct().map(i => (i.toLong, i)).rdd
  val edgeRDD: RDD[Edge[Double]] = graphData.map(x => Edge(x.getInt(1), x.getInt(2), x.getDouble(3))).rdd
  val graph: Graph[Int, Double] = Graph(vertexRDD, edgeRDD)

pregel调用需要3个函数

  • vprog 用消息初始化每个顶点(在本例中为空 Map[VertexId, Double] 以跟踪距离)
  • sendMsg 应用于每次迭代的更新步骤(在这种情况下,通过添加边的权重并返回带有消息的迭代器来更新距离以发送到下一次迭代
  • mergeMsg合并两条消息(2个Map[VertexId, Double]合并为1个,保持最短距离)

在代码中这可能是这样的:

def vprog(id: VertexId, orig: Map[VertexId, Double], newly: Map[VertexId, Double]): Map[VertexId, Double] = newly

def mergeMsg(a: Map[VertexId, Double], b: Map[VertexId, Double]): Map[VertexId, Double] = (a.toList ++ b.toList).groupBy(_._1).map{ // mapValues is not serializable :-(
    case (id, v) => id -> v.map(_._2).min // keep shortest distance in case of duplicate
}

def sendMsg(trip: EdgeTriplet[Map[VertexId, Double], Double]): Iterator[(VertexId, Map[VertexId, Double])] = {
    val w = trip.attr // weight of edge from src -> dst
    val distances = trip.dstAttr.mapValues(_ + w) + // update collected distances at dst + edge weight
      (trip.srcId -> 0.0, trip.dstId -> w) // set distance to src to 0  and to dst the edge weight

    // If src contains as much nodes as dst (we traversed all)
    if(trip.srcAttr.keySet.intersect(distances.keySet).size != distances.keySet.size)
      Iterator((trip.srcId, distances))
    else
      Iterator.empty
}

然后运行预凝胶,收集顶点并旋转地图以获得距离矩阵。

val initMap = Map.empty[VertexId, Double]

val result = graph
    .mapVertices((_,_) => initMap)
    .pregel(
      initialMsg = initMap,
      activeDirection = EdgeDirection.Out
    )(vprog, sendMsg, mergeMsg)
    .vertices
    .toDF("id","map")
    .select('id, explode('map))
    .groupBy("id")
    .pivot("key")
    .agg(min("value"))
    .orderBy("id")
    .show(false)

结果会像

+---+----+----+----+----+---+
|id |0   |1   |2   |3   |4  |
+---+----+----+----+----+---+
|0  |0.0 |8.0 |5.0 |11.0|7.0|
|1  |11.0|0.0 |2.0 |1.0 |4.0|
|2  |9.0 |3.0 |0.0 |4.0 |2.0|
|3  |11.0|21.0|16.0|0.0 |4.0|
|4  |7.0 |15.0|12.0|5.0 |0.0|
+---+----+----+----+----+---+

也许有 other/better 种方法,但这在计算上似乎不如将节点之间的最短路径计算为笛卡尔积 ;-)