如何使用 Spark Graphx 打印出最短路径
How can I print out the shortest path by using Spark Graphx
下面的代码运行良好,它打印出两个顶点之间的最短路径长度。但是如何打印出两个顶点之间的真实路径或细节边(不仅是长度)?
val conf = new SparkConf().setAppName("SimpleGraphX").setMaster("local")
val sc = new SparkContext(conf)
val vertexArray = Array(
(1L, ("Alice1", 28)),
(2L, ("Bob2", 27)),
(3L, ("Charlie3", 65)),
(4L, ("David4", 42)),
(5L, ("Ed5", 55)),
(6L, ("Fran6", 50))
)
val edgeArray = Array(
Edge(2L, 1L, 1),
Edge(2L, 4L, 1),
Edge(3L, 2L, 1),
Edge(3L, 6L, 1),
Edge(4L, 1L, 1),
Edge(5L, 2L, 1),
Edge(5L, 3L, 1),
Edge(5L, 6L, 1)
)
val sourceId : VertexId = 5L;
val initialGraph = graph.mapVertices(
(id,_) => if(id==sourceId)0.0 else Double.PositiveInfinity
)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist),
triplet =>{
if(triplet.srcAttr + triplet.attr < triplet.dstAttr){
Iterator((triplet.dstId, triplet.srcAttr+triplet.attr))
}else{
Iterator.empty
}
},
(a,b) => math.min(a,b)
)
println(sssp.vertices.collect.mkString("\n"))
代码在运行时,输出了VertexId=5和其他顶点的最短路径长度,如下:
(4,2.0)
(1,2.0)
(6,1.0)
(3,1.0)
(5,0.0)
(2,1.0)
例如结果(4,2.0)表示顶点5和顶点4之间的最短路径长度为2。但我希望它能打印出详细路径,如:5->2->4 .
您可以使用 graphFrame。
import org.apache.spark.sql.types.{ArrayType, StringType, StructType, StructField, DoubleType}
import org.graphframes._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.udf
import org.graphframes.lib.Pregel
import org.apache.spark.sql.functions.struct
import scala.collection.mutable.WrappedArray
import org.apache.spark.sql.Row
var vertex_df = spark.read.format("csv").option("header", "true").load("/home/tm/Documents/vertex1.csv")
val edge_df = spark.read.format("csv").option("header", "true").load("/home/tm/Documents/edge1.csv")
spark.sparkContext.setCheckpointDir("/home/tm/checkpoints")
var graph = GraphFrame(vertex_df, edge_df)
val inDegrees=graph.inDegrees
val outDegrees=graph.outDegrees
vertex_df = vertex_df.join(outDegrees,outDegrees("id") === vertex_df("id"), "left").join(inDegrees, inDegrees("id") === vertex_df("id"), "left").select(vertex_df("id"), vertex_df("name"), outDegrees("outDegree"), inDegrees("inDegree"))
vertex_df = vertex_df.withColumn("nodeType", when(col("inDegree").isNull,"root").otherwise(when(col("outDegree").isNull,"leaf").otherwise("child")))
graph = GraphFrame(vertex_df, edge_df)
val root_node = 5
val vertColSchema = StructType(
List(
StructField("dist", DoubleType, true),
StructField("name", StringType, true),
StructField("path", ArrayType(StringType), true)
)
)
def vertexProgram(vd: Row, msg:Row): (Double, String, WrappedArray[String]) ={
if (msg == null || vd(0).asInstanceOf[Double] < msg(0).asInstanceOf[Double])
{
(vd(0).asInstanceOf[Double], vd(1).asInstanceOf[String], vd(2).asInstanceOf[WrappedArray[String]])
}
else
{
(msg(0).asInstanceOf[Double], vd(1).asInstanceOf[String], msg(2).asInstanceOf[WrappedArray[String]])
}
}
val vertexProgramUdf = udf(vertexProgram _)
def sendMsgToDst(src:Row, dst:Row): (Double, String, WrappedArray[String]) = {
val srcDist = src(0)
val dstDist = dst(0)
if (srcDist.asInstanceOf[Double] < (dstDist.asInstanceOf[Double] - 1))
{
(srcDist.asInstanceOf[Double] + 1, src(1).asInstanceOf[String], src(2).asInstanceOf[WrappedArray[String]] :+ dst(1).asInstanceOf[String])
}
else {
null
}
}
val sendMsgToDstUdf = udf(sendMsgToDst _)
def aggMsgs(agg: WrappedArray[Row]): (Double, String, WrappedArray[String]) = {
(agg(0)(0).asInstanceOf[Double], agg(0)(1).asInstanceOf[String], agg(0)(2).asInstanceOf[WrappedArray[String]])
}
val aggMsgsUdf = udf(aggMsgs _)
val dbl:Double = 0.0
val result = graph.pregel.setMaxIter(3).withVertexColumn(colName = "vertCol",
initialExpr = when(col("id")===(lit(root_node)), struct(lit(dbl), col("id"), array(col("id"))))
.otherwise(struct(lit(Double.PositiveInfinity), col("id"), array(lit("")))).cast(vertColSchema),
updateAfterAggMsgsExpr = vertexProgramUdf(col("vertCol"), Pregel.msg)).sendMsgToDst(sendMsgToDstUdf(col("src.vertCol"), Pregel.dst("vertCol"))).aggMsgs(aggMsgsUdf(collect_list(Pregel.msg))).run()
scala> result.show()
+---+-------+---------+--------+--------+-------------------+
| id| name|outDegree|inDegree|nodeType| vertCol|
+---+-------+---------+--------+--------+-------------------+
| 1| Alice| null| 2| leaf|[2.0, 1, [5, 2, 1]]|
| 2| Bob| 2| 2| child| [1.0, 2, [5, 2]]|
| 3|Charlie| 2| 1| child| [1.0, 3, [5, 3]]|
| 4| David| 1| 1| child|[2.0, 4, [5, 2, 4]]|
| 5| Ed| 3| null| root| [0.0, 5, [5]]|
| 6| Fran| null| 2| leaf| [1.0, 6, [5, 6]]|
+---+-------+---------+--------+--------+-------------------+
下面的代码运行良好,它打印出两个顶点之间的最短路径长度。但是如何打印出两个顶点之间的真实路径或细节边(不仅是长度)?
val conf = new SparkConf().setAppName("SimpleGraphX").setMaster("local")
val sc = new SparkContext(conf)
val vertexArray = Array(
(1L, ("Alice1", 28)),
(2L, ("Bob2", 27)),
(3L, ("Charlie3", 65)),
(4L, ("David4", 42)),
(5L, ("Ed5", 55)),
(6L, ("Fran6", 50))
)
val edgeArray = Array(
Edge(2L, 1L, 1),
Edge(2L, 4L, 1),
Edge(3L, 2L, 1),
Edge(3L, 6L, 1),
Edge(4L, 1L, 1),
Edge(5L, 2L, 1),
Edge(5L, 3L, 1),
Edge(5L, 6L, 1)
)
val sourceId : VertexId = 5L;
val initialGraph = graph.mapVertices(
(id,_) => if(id==sourceId)0.0 else Double.PositiveInfinity
)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist),
triplet =>{
if(triplet.srcAttr + triplet.attr < triplet.dstAttr){
Iterator((triplet.dstId, triplet.srcAttr+triplet.attr))
}else{
Iterator.empty
}
},
(a,b) => math.min(a,b)
)
println(sssp.vertices.collect.mkString("\n"))
代码在运行时,输出了VertexId=5和其他顶点的最短路径长度,如下:
(4,2.0)
(1,2.0)
(6,1.0)
(3,1.0)
(5,0.0)
(2,1.0)
例如结果(4,2.0)表示顶点5和顶点4之间的最短路径长度为2。但我希望它能打印出详细路径,如:5->2->4 .
您可以使用 graphFrame。
import org.apache.spark.sql.types.{ArrayType, StringType, StructType, StructField, DoubleType}
import org.graphframes._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.udf
import org.graphframes.lib.Pregel
import org.apache.spark.sql.functions.struct
import scala.collection.mutable.WrappedArray
import org.apache.spark.sql.Row
var vertex_df = spark.read.format("csv").option("header", "true").load("/home/tm/Documents/vertex1.csv")
val edge_df = spark.read.format("csv").option("header", "true").load("/home/tm/Documents/edge1.csv")
spark.sparkContext.setCheckpointDir("/home/tm/checkpoints")
var graph = GraphFrame(vertex_df, edge_df)
val inDegrees=graph.inDegrees
val outDegrees=graph.outDegrees
vertex_df = vertex_df.join(outDegrees,outDegrees("id") === vertex_df("id"), "left").join(inDegrees, inDegrees("id") === vertex_df("id"), "left").select(vertex_df("id"), vertex_df("name"), outDegrees("outDegree"), inDegrees("inDegree"))
vertex_df = vertex_df.withColumn("nodeType", when(col("inDegree").isNull,"root").otherwise(when(col("outDegree").isNull,"leaf").otherwise("child")))
graph = GraphFrame(vertex_df, edge_df)
val root_node = 5
val vertColSchema = StructType(
List(
StructField("dist", DoubleType, true),
StructField("name", StringType, true),
StructField("path", ArrayType(StringType), true)
)
)
def vertexProgram(vd: Row, msg:Row): (Double, String, WrappedArray[String]) ={
if (msg == null || vd(0).asInstanceOf[Double] < msg(0).asInstanceOf[Double])
{
(vd(0).asInstanceOf[Double], vd(1).asInstanceOf[String], vd(2).asInstanceOf[WrappedArray[String]])
}
else
{
(msg(0).asInstanceOf[Double], vd(1).asInstanceOf[String], msg(2).asInstanceOf[WrappedArray[String]])
}
}
val vertexProgramUdf = udf(vertexProgram _)
def sendMsgToDst(src:Row, dst:Row): (Double, String, WrappedArray[String]) = {
val srcDist = src(0)
val dstDist = dst(0)
if (srcDist.asInstanceOf[Double] < (dstDist.asInstanceOf[Double] - 1))
{
(srcDist.asInstanceOf[Double] + 1, src(1).asInstanceOf[String], src(2).asInstanceOf[WrappedArray[String]] :+ dst(1).asInstanceOf[String])
}
else {
null
}
}
val sendMsgToDstUdf = udf(sendMsgToDst _)
def aggMsgs(agg: WrappedArray[Row]): (Double, String, WrappedArray[String]) = {
(agg(0)(0).asInstanceOf[Double], agg(0)(1).asInstanceOf[String], agg(0)(2).asInstanceOf[WrappedArray[String]])
}
val aggMsgsUdf = udf(aggMsgs _)
val dbl:Double = 0.0
val result = graph.pregel.setMaxIter(3).withVertexColumn(colName = "vertCol",
initialExpr = when(col("id")===(lit(root_node)), struct(lit(dbl), col("id"), array(col("id"))))
.otherwise(struct(lit(Double.PositiveInfinity), col("id"), array(lit("")))).cast(vertColSchema),
updateAfterAggMsgsExpr = vertexProgramUdf(col("vertCol"), Pregel.msg)).sendMsgToDst(sendMsgToDstUdf(col("src.vertCol"), Pregel.dst("vertCol"))).aggMsgs(aggMsgsUdf(collect_list(Pregel.msg))).run()
scala> result.show()
+---+-------+---------+--------+--------+-------------------+
| id| name|outDegree|inDegree|nodeType| vertCol|
+---+-------+---------+--------+--------+-------------------+
| 1| Alice| null| 2| leaf|[2.0, 1, [5, 2, 1]]|
| 2| Bob| 2| 2| child| [1.0, 2, [5, 2]]|
| 3|Charlie| 2| 1| child| [1.0, 3, [5, 3]]|
| 4| David| 1| 1| child|[2.0, 4, [5, 2, 4]]|
| 5| Ed| 3| null| root| [0.0, 5, [5]]|
| 6| Fran| null| 2| leaf| [1.0, 6, [5, 6]]|
+---+-------+---------+--------+--------+-------------------+