从成对的 String 生成 VertexId

Generate `VertexId` from pairs of `String`

我正在使用 GraphX 在 Spark 上处理一些图形数据。输入数据为 RDD[(String, String)]。我使用以下代码片段将 String 映射到 VertexId 并构建图表。

val input: RDD[(String, String)] = ...

val vertexIds = input.map(_._1)
                     .union(input.map(_._2))
                     .distinct()
                     .zipWithUniqueId()
                     .cache()

val edges = input.join(vertexIds)
                 .map { case (u, (v, uid)) => (v, uid) }
                 .join(vertexIds)
                 .map { case (v, (uid, vid)) => Edge(uid, vid, 1) }

val graph = Graph(vertexIds.map { case (v, vid) => (vid, v) }, edges )

当我抽查前1000个最高度节点时,我发现GraphX的结果与原始输入不同。这是我如何转储高度节点

graph.outerJoinVertices(graph.outDegrees) {
  (_, vdata, deg) => (deg.getOrElse(0L), vdata)
}.vertices.map(_._2).top(1000).saveTo(....)

我怀疑 .zipWithUniqueId 为每次评估提供了不稳定的 ID。我试过了

两者都没有解决问题。每个运行.

的top 1000度节点结果略有不同

我找到了两个稳定 String -> VertexId 映射的解决方案:

  • 坚持vertexIds到FS。

    input.map(_._1)
         .union(input.map(_._2))
         .distinct()
         .zipWithUniqueId()
         .saveAsObjectFile("some location")
    val vertexId = sc.objectFile("some location")
    
  • 使用抗冲突哈希函数。我使用 Guava 的 murmur3_128 散列并将前 8 个字节作为 vertexId。使用这种方法,您不需要进行任何进一步的连接,效率更高。