从成对的 String 生成 VertexId
Generate `VertexId` from pairs of `String`
我正在使用 GraphX 在 Spark 上处理一些图形数据。输入数据为 RDD[(String, String)]
。我使用以下代码片段将 String
映射到 VertexId
并构建图表。
val input: RDD[(String, String)] = ...
val vertexIds = input.map(_._1)
.union(input.map(_._2))
.distinct()
.zipWithUniqueId()
.cache()
val edges = input.join(vertexIds)
.map { case (u, (v, uid)) => (v, uid) }
.join(vertexIds)
.map { case (v, (uid, vid)) => Edge(uid, vid, 1) }
val graph = Graph(vertexIds.map { case (v, vid) => (vid, v) }, edges )
当我抽查前1000个最高度节点时,我发现GraphX的结果与原始输入不同。这是我如何转储高度节点
graph.outerJoinVertices(graph.outDegrees) {
(_, vdata, deg) => (deg.getOrElse(0L), vdata)
}.vertices.map(_._2).top(1000).saveTo(....)
我怀疑 .zipWithUniqueId
为每次评估提供了不稳定的 ID。我试过了
- 插入
vertexIds.count()
以强制实现,这样 vertexIds
就不会被重新计算。
- 插入
.sortBy(...).zipWithUniqueId()
以确保顺序相同。
两者都没有解决问题。每个运行.
的top 1000度节点结果略有不同
我找到了两个稳定 String -> VertexId
映射的解决方案:
坚持vertexIds
到FS。
input.map(_._1)
.union(input.map(_._2))
.distinct()
.zipWithUniqueId()
.saveAsObjectFile("some location")
val vertexId = sc.objectFile("some location")
使用抗冲突哈希函数。我使用 Guava 的 murmur3_128 散列并将前 8 个字节作为 vertexId。使用这种方法,您不需要进行任何进一步的连接,效率更高。
我正在使用 GraphX 在 Spark 上处理一些图形数据。输入数据为 RDD[(String, String)]
。我使用以下代码片段将 String
映射到 VertexId
并构建图表。
val input: RDD[(String, String)] = ...
val vertexIds = input.map(_._1)
.union(input.map(_._2))
.distinct()
.zipWithUniqueId()
.cache()
val edges = input.join(vertexIds)
.map { case (u, (v, uid)) => (v, uid) }
.join(vertexIds)
.map { case (v, (uid, vid)) => Edge(uid, vid, 1) }
val graph = Graph(vertexIds.map { case (v, vid) => (vid, v) }, edges )
当我抽查前1000个最高度节点时,我发现GraphX的结果与原始输入不同。这是我如何转储高度节点
graph.outerJoinVertices(graph.outDegrees) {
(_, vdata, deg) => (deg.getOrElse(0L), vdata)
}.vertices.map(_._2).top(1000).saveTo(....)
我怀疑 .zipWithUniqueId
为每次评估提供了不稳定的 ID。我试过了
- 插入
vertexIds.count()
以强制实现,这样vertexIds
就不会被重新计算。 - 插入
.sortBy(...).zipWithUniqueId()
以确保顺序相同。
两者都没有解决问题。每个运行.
的top 1000度节点结果略有不同我找到了两个稳定 String -> VertexId
映射的解决方案:
坚持
vertexIds
到FS。input.map(_._1) .union(input.map(_._2)) .distinct() .zipWithUniqueId() .saveAsObjectFile("some location") val vertexId = sc.objectFile("some location")
使用抗冲突哈希函数。我使用 Guava 的 murmur3_128 散列并将前 8 个字节作为 vertexId。使用这种方法,您不需要进行任何进一步的连接,效率更高。