如何在 Graphx 中并行 Prims 算法
How to Parallel Prims Algorithm in Graphx
所以我正在尝试为 Prims 算法编写一个并行算法,但我不太清楚如何使用 Spark Graphx 来完成它。我已经很努力地寻找资源,但在 graphx 中实现最短路径算法的例子并不多。我想我需要使用分治法将图拆分成子图,然后合并它们的 MST。
Graphx 资源:
http://ampcamp.berkeley.edu/big-data-mini-course/graph-analytics-with-graphx.html#the-property-graph
平行Prims资源:
https://www8.cs.umu.se/kurser/5DV050/VT10/handouts/F10.pdf
代码:
import org.apache.spark._
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx.util._
object ParallelPrims {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Parallel Prims").setMaster("local")
val sc = new SparkContext(conf)
val logFile = "NodeData.txt"
val logData = sc.textFile(logFile, 2).cache()
// Splitting off header node
val headerAndRows = logData.map(line => line.split(",").map(_.trim))
val header = headerAndRows.first
val data = headerAndRows.filter(_(0) != header(0))
// Parse number of Nodes and Edges from header
val numNodes = header(0).toInt
val numEdges = header(1).toInt
val vertexArray = new Array[(Long, String)](numNodes)
val edgeArray = new Array[Edge[Int]](numEdges)
// Create vertex array
var count = 0
for (count <- 0 to numNodes - 1) {
vertexArray(count) = (count.toLong + 1, ("v" + (count + 1)).toString())
}
count = 0
val rrdarr = data.take(data.count.toInt)
// Create edge array
for (count <- 0 to (numEdges - 1)) {
val line = rrdarr(count)
val cols = line.toList
val edge = Edge(cols(0).toLong, cols(1).toLong, cols(2).toInt)
edgeArray(count) = Edge(cols(0).toLong, cols(1).toLong, cols(2).toInt)
}
// Creating graphx graph
val vertexRDD: RDD[(Long, (String))] = sc.parallelize(vertexArray)
val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)
val graph: Graph[String, Int] = Graph(vertexRDD, edgeRDD)
graph.triplets.take(6).foreach(println)
}
}
NodeData.txt
4,6
1,2,5
1,3,8
1,4,4
2,3,8
2,4,7
3,4,1
输出
((1,v1),(2,v2),5)
((1,v1),(3,v3),8)
((1,v1),(4,v4),4)
((2,v2),(3,v3),8)
((2,v2),(4,v4),7)
((3,v3),(4,v4),1)
这是我的 Prims 算法版本。
var graph : Graph [String, Int] = ...
// just empty RDD for MST
var MST = sc.parallelize(Array[EdgeTriplet[Int, Int]]())
// pick random vertex from graph
var Vt: RDD[VertexId] = sc.parallelize(Array(graph.pickRandomVertex))
// do until all vertices is in Vt set
val vcount = graph.vertices.count
while (Vt.count < vcount) {
// rdd to make inner joins
val hVt = Vt.map(x => (x, x))
// add key to make inner join
val bySrc = graph.triplets.map(triplet => (triplet.srcId, triplet))
// add key to make inner join
val byDst = graph.triplets.map(triplet => (triplet.dstId, triplet))
// all triplet where source vertex is in Vt
val bySrcJoined = bySrc.join(hVt).map(_._2._1)
// all triplet where destinaiton vertex is in Vt
val byDstJoined = byDst.join(hVt).map(_._2._1)
// sum previous two rdds and substract all triplets where both source and destination vertex in Vt
val candidates = bySrcJoined.union(byDstJoined).subtract(byDstJoined.intersection(bySrcJoined))
// find triplet with least weight
val triplet = candidates.sortBy(triplet => triplet.attr).first
// add triplet to MST
MST = MST.union(sc.parallelize(Array(triplet)))
// find out whether we should add source or destinaiton vertex to Vt
if (!Vt.filter(x => x == triplet.srcId).isEmpty) {
Vt = Vt.union(sc.parallelize(Array(triplet.dstId)))
} else {
Vt = Vt.union(sc.parallelize(Array(triplet.srcId)))
}
}
// final minimum spanning tree
MST.collect.foreach(p => println(p.srcId + " " + p.attr + " " + p.dstId))
所以我正在尝试为 Prims 算法编写一个并行算法,但我不太清楚如何使用 Spark Graphx 来完成它。我已经很努力地寻找资源,但在 graphx 中实现最短路径算法的例子并不多。我想我需要使用分治法将图拆分成子图,然后合并它们的 MST。
Graphx 资源: http://ampcamp.berkeley.edu/big-data-mini-course/graph-analytics-with-graphx.html#the-property-graph
平行Prims资源: https://www8.cs.umu.se/kurser/5DV050/VT10/handouts/F10.pdf
代码:
import org.apache.spark._
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx.util._
object ParallelPrims {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Parallel Prims").setMaster("local")
val sc = new SparkContext(conf)
val logFile = "NodeData.txt"
val logData = sc.textFile(logFile, 2).cache()
// Splitting off header node
val headerAndRows = logData.map(line => line.split(",").map(_.trim))
val header = headerAndRows.first
val data = headerAndRows.filter(_(0) != header(0))
// Parse number of Nodes and Edges from header
val numNodes = header(0).toInt
val numEdges = header(1).toInt
val vertexArray = new Array[(Long, String)](numNodes)
val edgeArray = new Array[Edge[Int]](numEdges)
// Create vertex array
var count = 0
for (count <- 0 to numNodes - 1) {
vertexArray(count) = (count.toLong + 1, ("v" + (count + 1)).toString())
}
count = 0
val rrdarr = data.take(data.count.toInt)
// Create edge array
for (count <- 0 to (numEdges - 1)) {
val line = rrdarr(count)
val cols = line.toList
val edge = Edge(cols(0).toLong, cols(1).toLong, cols(2).toInt)
edgeArray(count) = Edge(cols(0).toLong, cols(1).toLong, cols(2).toInt)
}
// Creating graphx graph
val vertexRDD: RDD[(Long, (String))] = sc.parallelize(vertexArray)
val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)
val graph: Graph[String, Int] = Graph(vertexRDD, edgeRDD)
graph.triplets.take(6).foreach(println)
}
}
NodeData.txt
4,6
1,2,5
1,3,8
1,4,4
2,3,8
2,4,7
3,4,1
输出
((1,v1),(2,v2),5)
((1,v1),(3,v3),8)
((1,v1),(4,v4),4)
((2,v2),(3,v3),8)
((2,v2),(4,v4),7)
((3,v3),(4,v4),1)
这是我的 Prims 算法版本。
var graph : Graph [String, Int] = ...
// just empty RDD for MST
var MST = sc.parallelize(Array[EdgeTriplet[Int, Int]]())
// pick random vertex from graph
var Vt: RDD[VertexId] = sc.parallelize(Array(graph.pickRandomVertex))
// do until all vertices is in Vt set
val vcount = graph.vertices.count
while (Vt.count < vcount) {
// rdd to make inner joins
val hVt = Vt.map(x => (x, x))
// add key to make inner join
val bySrc = graph.triplets.map(triplet => (triplet.srcId, triplet))
// add key to make inner join
val byDst = graph.triplets.map(triplet => (triplet.dstId, triplet))
// all triplet where source vertex is in Vt
val bySrcJoined = bySrc.join(hVt).map(_._2._1)
// all triplet where destinaiton vertex is in Vt
val byDstJoined = byDst.join(hVt).map(_._2._1)
// sum previous two rdds and substract all triplets where both source and destination vertex in Vt
val candidates = bySrcJoined.union(byDstJoined).subtract(byDstJoined.intersection(bySrcJoined))
// find triplet with least weight
val triplet = candidates.sortBy(triplet => triplet.attr).first
// add triplet to MST
MST = MST.union(sc.parallelize(Array(triplet)))
// find out whether we should add source or destinaiton vertex to Vt
if (!Vt.filter(x => x == triplet.srcId).isEmpty) {
Vt = Vt.union(sc.parallelize(Array(triplet.dstId)))
} else {
Vt = Vt.union(sc.parallelize(Array(triplet.srcId)))
}
}
// final minimum spanning tree
MST.collect.foreach(p => println(p.srcId + " " + p.attr + " " + p.dstId))