如何使用 GraphX 计算邻居的平均度数
how to compute average degree of neighbors with GraphX
我想计算图中每个节点的平均邻居度。假设我们有这样的图表:
val users: RDD[(VertexId, String)] =
sc.parallelize(Array((3L, "rxin"),
(7L, "jgonzal"),
(5L, "franklin"),
(2L, "istoica")))
// Create an RDD for edges
val relationships: RDD[Edge[Int]] = sc.parallelize(
Array(Edge(3L, 7L, 12),
Edge(5L, 3L, 1),
Edge(2L, 5L, 3),
Edge(5L, 7L, 5)))
// Build the initial Graph
val graph = Graph(users, relationships)
编辑
要了解结果,请使用节点 5 及其邻居:
- 度数 = 2 的节点 3
- 度数 = 2 的节点 7
- 度数 = 1 的节点 2
此度量的输出只是节点 5 的邻居的平均度数:(2+2+1)/3 = 1.666
理想情况下,您想在此计算中删除与节点 5 的链接,但现在这对我来说并不重要...
编辑结束
我正在尝试应用 aggregateMessages,但我不知道如何在进入 aggregateMessages 调用时检索每个节点的度数:
val neideg = g.aggregateMessages[(Long, Double)](
triplet => {
val comparedAttrs = compareAttrs(triplet.dstAttr, triplet.srcAttr) // BUT HERE I SHOULD GIVE ALSO THE DEGREE
triplet.sendToDst(1L, comparedAttrs)
triplet.sendToSrc(1L, comparedAttrs)
},
{ case ((cnt1, v1), (cnt2, v2)) => (cnt1 + cnt2, v1 + v2) })
val aveneideg = neideg.mapValues(kv => kv._2 / kv._1.toDouble).toDF("id", "aveneideg")
然后我有一个计算总和的函数:
def compareAttrs(xs: (Int, String), ys: (Int, String)): Double = {
xs._1.toDouble + ys._1.toDouble
}
如何将这些节点的度值也传递给 compareAttrs?
当然,与我正在尝试制作的解决方案相比,我很高兴看到针对此任务的更简单、更智能的解决方案...
我不清楚这是否是您想要的,但您可以选择以下内容:
val degrees = graph.degrees
// now we have a graph where attribute is a degree of a vertex
val graphWithDegrees = graph.outerJoinVertices(degrees) { (_, _, optDegree) =>
optDegree.getOrElse(1)
}
// now each vertex sends its degree to its neighbours
// we aggregate them in a set where each vertex gets all values
// of its neighbours
val neighboursDegreeAndCount = graphWithDegrees.aggregateMessages[List[Long]](
sendMsg = triplet => {
val srcDegree = triplet.srcAttr
val dstDegree = triplet.dstAttr
triplet.sendToDst(List(srcDegree))
triplet.sendToSrc(List(dstDegree))
},
mergeMsg = (x, y) => x ++ y
).mapValues(degrees => degrees.sum / degrees.size.toDouble)
// now if you want it in the original graph you can do
// outerJoinVertices again, and now the attr of vertex
// in the graph is avg of its neighbours
graph.outerJoinVertices(neighboursDegreeAndCount) { (_, _, optAvgDegree) =>
optAvgDegree.getOrElse(1)
}
因此对于您的示例,输出为:Array((5,1.6666666666666667), (2,3.0), (3,2.5), (7,2.5))
我想计算图中每个节点的平均邻居度。假设我们有这样的图表:
val users: RDD[(VertexId, String)] =
sc.parallelize(Array((3L, "rxin"),
(7L, "jgonzal"),
(5L, "franklin"),
(2L, "istoica")))
// Create an RDD for edges
val relationships: RDD[Edge[Int]] = sc.parallelize(
Array(Edge(3L, 7L, 12),
Edge(5L, 3L, 1),
Edge(2L, 5L, 3),
Edge(5L, 7L, 5)))
// Build the initial Graph
val graph = Graph(users, relationships)
编辑 要了解结果,请使用节点 5 及其邻居:
- 度数 = 2 的节点 3
- 度数 = 2 的节点 7
- 度数 = 1 的节点 2
此度量的输出只是节点 5 的邻居的平均度数:(2+2+1)/3 = 1.666
理想情况下,您想在此计算中删除与节点 5 的链接,但现在这对我来说并不重要...
编辑结束
我正在尝试应用 aggregateMessages,但我不知道如何在进入 aggregateMessages 调用时检索每个节点的度数:
val neideg = g.aggregateMessages[(Long, Double)](
triplet => {
val comparedAttrs = compareAttrs(triplet.dstAttr, triplet.srcAttr) // BUT HERE I SHOULD GIVE ALSO THE DEGREE
triplet.sendToDst(1L, comparedAttrs)
triplet.sendToSrc(1L, comparedAttrs)
},
{ case ((cnt1, v1), (cnt2, v2)) => (cnt1 + cnt2, v1 + v2) })
val aveneideg = neideg.mapValues(kv => kv._2 / kv._1.toDouble).toDF("id", "aveneideg")
然后我有一个计算总和的函数:
def compareAttrs(xs: (Int, String), ys: (Int, String)): Double = {
xs._1.toDouble + ys._1.toDouble
}
如何将这些节点的度值也传递给 compareAttrs?
当然,与我正在尝试制作的解决方案相比,我很高兴看到针对此任务的更简单、更智能的解决方案...
我不清楚这是否是您想要的,但您可以选择以下内容:
val degrees = graph.degrees
// now we have a graph where attribute is a degree of a vertex
val graphWithDegrees = graph.outerJoinVertices(degrees) { (_, _, optDegree) =>
optDegree.getOrElse(1)
}
// now each vertex sends its degree to its neighbours
// we aggregate them in a set where each vertex gets all values
// of its neighbours
val neighboursDegreeAndCount = graphWithDegrees.aggregateMessages[List[Long]](
sendMsg = triplet => {
val srcDegree = triplet.srcAttr
val dstDegree = triplet.dstAttr
triplet.sendToDst(List(srcDegree))
triplet.sendToSrc(List(dstDegree))
},
mergeMsg = (x, y) => x ++ y
).mapValues(degrees => degrees.sum / degrees.size.toDouble)
// now if you want it in the original graph you can do
// outerJoinVertices again, and now the attr of vertex
// in the graph is avg of its neighbours
graph.outerJoinVertices(neighboursDegreeAndCount) { (_, _, optAvgDegree) =>
optAvgDegree.getOrElse(1)
}
因此对于您的示例,输出为:Array((5,1.6666666666666667), (2,3.0), (3,2.5), (7,2.5))