Spark GraphX:通过传递三元组中的顶点值进行过滤

Spark GraphX : Filtering by passing a vertex value in triplet

我在 Windows 10 上使用 Spark 2.1.0。因为我是 Spark 的新手,所以我正在关注这个 tutorial

在教程中,作者使用以下代码打印了图形的所有三元组:

graph.triplets.sortBy(_.attr, ascending=false).map(triplet =>
"There were " + triplet.attr.toString + " flights from " + triplet.srcAttr + " to " + triplet.dstAttr + ".").take(10)

问题:我想提供一个输入(例如 ATL),我想查看 ATL 的所有出港航班及其计数,如下所示:

res60: Array[String] = Array(There were 1388 flights from ATL to LAX.,
There were 1330 flights from ATL to SFO., There were 1283 flights from ATL to HNL., 
There were 1205 flights from ATL to BOS., There were 1229 flights from ATL to LGA., 
There were 1214 flights from ATL to OGG., There were 1173 flights from ATL to LAS., 
There were 1157 flights from ATL to SAN.)

代码如下:

// Selecting the desired airport
val input = "ATL"
// filtering the edges of the desired airport (here "ATL") from the `graph`(which is built on the full data)
val TEMPEdge = graph.edges.filter { case Edge(src, dst, prop) => src == MurmurHash3.stringHash(input) }
// Creating a new graph with the filtered edges
val TEMPGraph = Graph(airportVertices, TEMPEdge, defaultAirport)
// Printing the top 10
TEMPGraph.triplets.sortBy(_.attr, ascending=false).map(triplet => "There were " + triplet.attr.toString + " flights from " + triplet.srcAttr + " to " + triplet.dstAttr + "\n").take(10)

或者,我们可以使用过滤器

graph.triplets.sortBy(_.attr, ascending=false).filter {_.dstAttr == input }.map(triplet => "There were " + triplet.attr.toString + " flights from " + triplet.srcAttr + " to " + triplet.dstAttr + "\n").take(3)