如何过滤相邻顶点类型的混合节点图
How to filter a mixed-node graph on neighbor vertex types
这个问题是关于 Spark GraphX 的。我想通过删除与某些其他节点相邻的节点来计算子图。
例子
[任务]保留与C2节点不是邻居的A节点和B节点。
输入图表:
┌────┐
┌─────│ A │──────┐
│ └────┘ │
v v
┌────┐ ┌────┐ ┌────┐ ┌────┐
│ C1 │────>│ B │ │ B │<────│ C2 │
└────┘ └────┘ └────┘ └────┘
^ ^
│ ┌────┐ │
└─────│ A │──────┘
└────┘
输出图:
┌────┐
┌─────│ A │
│ └────┘
v
┌────┐
│ B │
└────┘
^
│ ┌────┐
└─────│ A │
└────┘
如何优雅地编写 returns 输出图的 GraphX 查询?
一种解决方案是使用三元组视图来识别作为 C1 节点邻居的 B 节点子集。接下来,将它们与 A 节点合并。接下来,创建一个新图表:
// Step 1
// Compute the subset of B's that are neighbors with C1
val nodesBC1 = graph.triplets .
filter {trip => trip.srcAttr == "C1"} .
map {trip => (trip.dstId, trip.dstAttr)}
// Step 2
// Union the subset B's with all the A's
val nodesAB = nodesBC1 .
union(graph.vertices filter {case (id, label) => label == "A"})
// Step 3
// Create a graph using the subset nodes and all the original edges
// Remove nodes that have null values
val solution1 = Graph(nodesAB, graph.edges) .
subgraph(vpred = {case(id, label) => label != null})
在第 1 步中,我通过将三元组视图的 dstID 和 dstAttr 映射在一起,重新创建了一个节点 RDD(包含 B 节点)。不确定这对于大型图的效率如何?
使用 GraphOps.collectNeighbors
查找 val nodesAB
的不同方法
val nodesAB = graph.collectNeighbors(EdgeDirection.Either)
.filter{case (vid,ns) => ! ns.map(_._2).contains("C2")}.map(_._1)
.intersection(
graph.vertices
.filter{case (vid,attr) => ! attr.toString.startsWith("C") }.map(_._1)
)
其余的工作方式与您相同:
val solution1 = Graph(nodesAB, graph.edges) .
subgraph(vpred = {case(id, label) => label != null})
如果你想使用 DataFrame,它可能(?)更具可扩展性,那么首先我们需要将 nodesAB 变成一个 DataFrame:
val newNodes = sqlContext.createDataFrame(
nodesAB,
StructType(Array(StructField("newNode", LongType, false)))
)
并且您创建了 DataFrame 并以此为基础:
val edgeDf = sqlContext.createDataFrame(
graph.edges.map{edge => Row(edge.srcId, edge.dstId, edge.attr)},
StructType(Array(
StructField("srcId", LongType, false),
StructField("dstId", LongType, false),
StructField("attr", LongType, false)
))
)
然后您可以这样做来创建没有子图的图表:
val solution1 = Graph(
nodesAB,
edgeDf
.join(newNodes, $"srcId" === $"newNode").select($"srcId", $"dstId", $"attr")
.join(newNodes, $"dstId" === $"newNode")
.rdd.map(row => Edge(row.getLong(0), row.getLong(1), row.getLong(2)))
)
这是另一种解决方案。此解决方案使用 aggregateMessages 将整数 (1) 发送到应从图中删除的那些 B。生成的顶点集与图形连接,随后的子图调用从输出图形中删除不需要的 B。
// Step 1: send the message (1) to vertices that should be removed
val deleteMe = graph.aggregateMessages[Int](
ctx => {
if (ctx.dstAttr.equals("B") && ctx.srcAttr.equals("C")) {
ctx.sendToDst(1) // 1 means delete, but number is not actually used
}
},
(a,b) => a // choose either message, they are all (1)
)
// Step 2: join vertex sets, original and deleteMe
val joined = graph.outerJoinVertices(deleteMe) {
(id, origValue, msgValue ) => msgValue match {
case Some(number) => "deleteme" // vertex received msg
case None => origValue
}
}
// Step 3: Remove nodes with domain = deleteme
joined.subgraph(vpred = (id, data) => data.equals("deleteme"))
我正在考虑一种只使用一个中间删除标志的方法,例如"deleteme",而不是 1 和 "deleteme"。但这是我到目前为止能做到的好事。
这个问题是关于 Spark GraphX 的。我想通过删除与某些其他节点相邻的节点来计算子图。
例子
[任务]保留与C2节点不是邻居的A节点和B节点。
输入图表:
┌────┐
┌─────│ A │──────┐
│ └────┘ │
v v
┌────┐ ┌────┐ ┌────┐ ┌────┐
│ C1 │────>│ B │ │ B │<────│ C2 │
└────┘ └────┘ └────┘ └────┘
^ ^
│ ┌────┐ │
└─────│ A │──────┘
└────┘
输出图:
┌────┐
┌─────│ A │
│ └────┘
v
┌────┐
│ B │
└────┘
^
│ ┌────┐
└─────│ A │
└────┘
如何优雅地编写 returns 输出图的 GraphX 查询?
一种解决方案是使用三元组视图来识别作为 C1 节点邻居的 B 节点子集。接下来,将它们与 A 节点合并。接下来,创建一个新图表:
// Step 1
// Compute the subset of B's that are neighbors with C1
val nodesBC1 = graph.triplets .
filter {trip => trip.srcAttr == "C1"} .
map {trip => (trip.dstId, trip.dstAttr)}
// Step 2
// Union the subset B's with all the A's
val nodesAB = nodesBC1 .
union(graph.vertices filter {case (id, label) => label == "A"})
// Step 3
// Create a graph using the subset nodes and all the original edges
// Remove nodes that have null values
val solution1 = Graph(nodesAB, graph.edges) .
subgraph(vpred = {case(id, label) => label != null})
在第 1 步中,我通过将三元组视图的 dstID 和 dstAttr 映射在一起,重新创建了一个节点 RDD(包含 B 节点)。不确定这对于大型图的效率如何?
使用 GraphOps.collectNeighbors
val nodesAB
的不同方法
val nodesAB = graph.collectNeighbors(EdgeDirection.Either)
.filter{case (vid,ns) => ! ns.map(_._2).contains("C2")}.map(_._1)
.intersection(
graph.vertices
.filter{case (vid,attr) => ! attr.toString.startsWith("C") }.map(_._1)
)
其余的工作方式与您相同:
val solution1 = Graph(nodesAB, graph.edges) .
subgraph(vpred = {case(id, label) => label != null})
如果你想使用 DataFrame,它可能(?)更具可扩展性,那么首先我们需要将 nodesAB 变成一个 DataFrame:
val newNodes = sqlContext.createDataFrame(
nodesAB,
StructType(Array(StructField("newNode", LongType, false)))
)
并且您创建了 DataFrame 并以此为基础:
val edgeDf = sqlContext.createDataFrame(
graph.edges.map{edge => Row(edge.srcId, edge.dstId, edge.attr)},
StructType(Array(
StructField("srcId", LongType, false),
StructField("dstId", LongType, false),
StructField("attr", LongType, false)
))
)
然后您可以这样做来创建没有子图的图表:
val solution1 = Graph(
nodesAB,
edgeDf
.join(newNodes, $"srcId" === $"newNode").select($"srcId", $"dstId", $"attr")
.join(newNodes, $"dstId" === $"newNode")
.rdd.map(row => Edge(row.getLong(0), row.getLong(1), row.getLong(2)))
)
这是另一种解决方案。此解决方案使用 aggregateMessages 将整数 (1) 发送到应从图中删除的那些 B。生成的顶点集与图形连接,随后的子图调用从输出图形中删除不需要的 B。
// Step 1: send the message (1) to vertices that should be removed
val deleteMe = graph.aggregateMessages[Int](
ctx => {
if (ctx.dstAttr.equals("B") && ctx.srcAttr.equals("C")) {
ctx.sendToDst(1) // 1 means delete, but number is not actually used
}
},
(a,b) => a // choose either message, they are all (1)
)
// Step 2: join vertex sets, original and deleteMe
val joined = graph.outerJoinVertices(deleteMe) {
(id, origValue, msgValue ) => msgValue match {
case Some(number) => "deleteme" // vertex received msg
case None => origValue
}
}
// Step 3: Remove nodes with domain = deleteme
joined.subgraph(vpred = (id, data) => data.equals("deleteme"))
我正在考虑一种只使用一个中间删除标志的方法,例如"deleteme",而不是 1 和 "deleteme"。但这是我到目前为止能做到的好事。